aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go3
-rw-r--r--weed/server/filer_grpc_server_remote.go26
-rw-r--r--weed/server/volume_grpc_erasure_coding.go4
-rw-r--r--weed/server/volume_grpc_remote.go17
4 files changed, 23 insertions, 27 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 08b01dd09..e025e73dc 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -384,6 +384,8 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
t := &filer_pb.GetFilerConfigurationResponse{
Masters: fs.option.Masters,
Collection: fs.option.Collection,
@@ -395,6 +397,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
+ ClusterId: string(clusterId),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 8049f4f46..8064431c5 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -6,11 +6,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"strings"
+ "sync"
"time"
)
@@ -27,7 +29,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
}
// find mapping
- var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
+ var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
var localMountedDir string
for k, loc := range mappings.Mappings {
if strings.HasPrefix(req.Directory, k) {
@@ -43,7 +45,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
if err != nil {
return nil, err
}
- storageConf := &filer_pb.RemoteConf{}
+ storageConf := &remote_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
}
@@ -79,12 +81,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
var chunks []*filer_pb.FileChunk
var fetchAndWriteErr error
+ var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
localOffset := offset
+ wg.Add(1)
limitedConcurrentExecutor.Execute(func() {
+ defer wg.Done()
size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset
@@ -114,14 +119,12 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
Cookie: uint32(fileId.Cookie),
Offset: localOffset,
Size: size,
- RemoteType: storageConf.Type,
- RemoteName: storageConf.Name,
- S3AccessKey: storageConf.S3AccessKey,
- S3SecretKey: storageConf.S3SecretKey,
- S3Region: storageConf.S3Region,
- S3Endpoint: storageConf.S3Endpoint,
- RemoteBucket: remoteStorageMountedLocation.Bucket,
- RemotePath: string(dest),
+ RemoteConf: storageConf,
+ RemoteLocation: &remote_pb.RemoteStorageLocation{
+ Name: remoteStorageMountedLocation.Name,
+ Bucket: remoteStorageMountedLocation.Bucket,
+ Path: string(dest),
+ },
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
@@ -129,7 +132,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
return nil
})
- if err != nil {
+ if err != nil && fetchAndWriteErr == nil {
fetchAndWriteErr = err
return
}
@@ -148,6 +151,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
})
}
+ wg.Wait()
if fetchAndWriteErr != nil {
return nil, fetchAndWriteErr
}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 452c2766e..d7e4f302a 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"io"
"io/ioutil"
"math"
@@ -12,7 +13,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -60,7 +60,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .vif files
- if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 5ca6619bd..0615a96a1 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -3,7 +3,6 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -17,25 +16,15 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
- remoteConf := &filer_pb.RemoteConf{
- Type: req.RemoteType,
- Name: req.RemoteName,
- S3AccessKey: req.S3AccessKey,
- S3SecretKey: req.S3SecretKey,
- S3Region: req.S3Region,
- S3Endpoint: req.S3Endpoint,
- }
+ remoteConf := req.RemoteConf
client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
if getClientErr != nil {
return nil, fmt.Errorf("get remote client: %v", getClientErr)
}
- remoteStorageLocation := &filer_pb.RemoteStorageLocation{
- Name: req.RemoteName,
- Bucket: req.RemoteBucket,
- Path: req.RemotePath,
- }
+ remoteStorageLocation := req.RemoteLocation
+
data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
if ReadRemoteErr != nil {
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)