aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/master_grpc_server_volume.go7
-rw-r--r--weed/server/master_server_handlers_admin.go4
-rw-r--r--weed/server/volume_grpc_copy.go22
-rw-r--r--weed/server/volume_grpc_erasure_coding.go75
-rw-r--r--weed/server/volume_grpc_tail.go3
5 files changed, 77 insertions, 34 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 8fc56e9b8..856c07890 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -5,10 +5,11 @@ import (
"fmt"
"github.com/chrislusf/raft"
+
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
@@ -108,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic
if req.Replication == "" {
req.Replication = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication)
if err != nil {
return nil, err
}
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index a5d976008..2965a4863 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -10,9 +10,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/topology"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -145,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr
if replicationString == "" {
replicationString = ms.option.DefaultReplicaPlacement
}
- replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString)
+ replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString)
if err != nil {
return nil, err
}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 711a3ebad..8a1ede1b8 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
return err
}
@@ -95,15 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
+ compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error {
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
- VolumeId: vid,
- Ext: ext,
- CompactionRevision: compactRevision,
- StopOffset: stopOffset,
- Collection: collection,
- IsEcVolume: isEcVolume,
+ VolumeId: vid,
+ Ext: ext,
+ CompactionRevision: compactRevision,
+ StopOffset: stopOffset,
+ Collection: collection,
+ IsEcVolume: isEcVolume,
+ IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
@@ -221,6 +222,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
file, err := os.Open(fileName)
if err != nil {
+ if req.IgnoreSourceFileNotFound && err == os.ErrNotExist {
+ return nil
+ }
return err
}
defer file.Close()
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 6ded84cc3..8626cf344 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,6 +8,7 @@ import (
"math"
"os"
"path"
+ "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -24,7 +25,7 @@ import (
Steps to apply erasure coding to .dat .idx files
0. ensure the volume is readonly
-1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files
+1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files
2. client ask master for possible servers to hold the ec files, at least 4 servers
3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server
4. target servers report the new ec files to the master
@@ -33,7 +34,7 @@ Steps to apply erasure coding to .dat .idx files
*/
-// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files
+// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
@@ -51,7 +52,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
}
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
@@ -59,7 +60,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
-// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files
+// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
@@ -68,7 +69,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
- // write .ec01 ~ .ec14 files
+ // write .ec00 ~ .ec13 files
baseFileName = path.Join(location.Directory, baseFileName)
if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
@@ -103,25 +104,26 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
- if !req.CopyEcxFile {
- return nil
- }
+ if req.CopyEcxFile {
- // copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
- return err
+ // copy ecx file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ return err
+ }
+ return nil
}
- // copy ecj file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
- return err
+ if req.CopyEcjFile {
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ return err
+ }
}
-
return nil
})
if err != nil {
@@ -137,6 +139,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
+
found := false
for _, location := range vs.store.Locations {
if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
@@ -153,21 +157,22 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
return nil, nil
}
- // check whether to delete the ecx file also
+ // check whether to delete the .ecx and .ecj file also
hasEcxFile := false
existingShardCount := 0
+ bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
for _, fileInfo := range fileInfos {
- if fileInfo.Name() == baseFilename+".ecx" {
+ if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj"{
hasEcxFile = true
continue
}
- if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") {
+ if strings.HasPrefix(fileInfo.Name(), bName+".ec") {
existingShardCount++
}
}
@@ -317,3 +322,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
return resp, nil
}
+
+// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
+func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+
+ v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
+ if !found {
+ return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
+ }
+ baseFileName := v.FileName()
+
+ if v.Collection != req.Collection {
+ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
+ }
+
+ // calculate .dat file size
+ datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ if err != nil {
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ }
+
+ // write .dat file from .ec00 ~ .ec09 files
+ if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ }
+
+ // write .idx file from .ecx and .ecj files
+ if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ }
+
+ return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
+}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 1bf61e1c7..c26d6ed8f 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error {
@@ -101,7 +102,7 @@ type VolumeFileScanner4Tailing struct {
lastProcessedTimestampNs uint64
}
-func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}