diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 7 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 4 | ||||
| -rw-r--r-- | weed/server/volume_grpc_copy.go | 22 | ||||
| -rw-r--r-- | weed/server/volume_grpc_erasure_coding.go | 75 | ||||
| -rw-r--r-- | weed/server/volume_grpc_tail.go | 3 |
5 files changed, 77 insertions, 34 deletions
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 8fc56e9b8..856c07890 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -5,10 +5,11 @@ import ( "fmt" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/security" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -52,7 +53,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest if req.Replication == "" { req.Replication = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } @@ -108,7 +109,7 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic if req.Replication == "" { req.Replication = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(req.Replication) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(req.Replication) if err != nil { return nil, err } diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index a5d976008..2965a4863 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -10,9 +10,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/topology" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -145,7 +145,7 @@ func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGr if replicationString == "" { replicationString = ms.option.DefaultReplicaPlacement } - replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) + replicaPlacement, err := super_block.NewReplicaPlacementFromString(replicationString) if err != nil { return nil, err } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 711a3ebad..8a1ede1b8 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil { + if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { return err } @@ -95,15 +95,16 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error { + compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error { copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ - VolumeId: vid, - Ext: ext, - CompactionRevision: compactRevision, - StopOffset: stopOffset, - Collection: collection, - IsEcVolume: isEcVolume, + VolumeId: vid, + Ext: ext, + CompactionRevision: compactRevision, + StopOffset: stopOffset, + Collection: collection, + IsEcVolume: isEcVolume, + IgnoreSourceFileNotFound: ignoreSourceFileNotFound, }) if err != nil { return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) @@ -221,6 +222,9 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v file, err := os.Open(fileName) if err != nil { + if req.IgnoreSourceFileNotFound && err == os.ErrNotExist { + return nil + } return err } defer file.Close() diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 6ded84cc3..8626cf344 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -8,6 +8,7 @@ import ( "math" "os" "path" + "path/filepath" "strings" "github.com/chrislusf/seaweedfs/weed/glog" @@ -24,7 +25,7 @@ import ( Steps to apply erasure coding to .dat .idx files 0. ensure the volume is readonly -1. client call VolumeEcShardsGenerate to generate the .ecx and .ec01~.ec14 files +1. client call VolumeEcShardsGenerate to generate the .ecx and .ec00 ~ .ec13 files 2. client ask master for possible servers to hold the ec files, at least 4 servers 3. client call VolumeEcShardsCopy on above target servers to copy ec files from the source server 4. target servers report the new ec files to the master @@ -33,7 +34,7 @@ Steps to apply erasure coding to .dat .idx files */ -// VolumeEcShardsGenerate generates the .ecx and .ec01 ~ .ec14 files +// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) @@ -51,7 +52,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) } - // write .ec01 ~ .ec14 files + // write .ec00 ~ .ec13 files if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } @@ -59,7 +60,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } -// VolumeEcShardsRebuild generates the any of the missing .ec01 ~ .ec14 files +// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) @@ -68,7 +69,7 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s for _, location := range vs.store.Locations { if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) { - // write .ec01 ~ .ec14 files + // write .ec00 ~ .ec13 files baseFileName = path.Join(location.Directory, baseFileName) if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err) @@ -103,25 +104,26 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil { + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } - if !req.CopyEcxFile { - return nil - } + if req.CopyEcxFile { - // copy ecx file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil { - return err + // copy ecx file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { + return err + } + return nil } - // copy ecj file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil { - return err + if req.CopyEcjFile { + // copy ecj file + if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { + return err + } } - return nil }) if err != nil { @@ -137,6 +139,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds) + found := false for _, location := range vs.store.Locations { if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) { @@ -153,21 +157,22 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se return nil, nil } - // check whether to delete the ecx file also + // check whether to delete the .ecx and .ecj file also hasEcxFile := false existingShardCount := 0 + bName := filepath.Base(baseFilename) for _, location := range vs.store.Locations { fileInfos, err := ioutil.ReadDir(location.Directory) if err != nil { continue } for _, fileInfo := range fileInfos { - if fileInfo.Name() == baseFilename+".ecx" { + if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj"{ hasEcxFile = true continue } - if strings.HasPrefix(fileInfo.Name(), baseFilename+".ec") { + if strings.HasPrefix(fileInfo.Name(), bName+".ec") { existingShardCount++ } } @@ -317,3 +322,35 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv return resp, nil } + +// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files +func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) { + + v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) + if !found { + return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) + } + baseFileName := v.FileName() + + if v.Collection != req.Collection { + return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) + } + + // calculate .dat file size + datFileSize, err := erasure_coding.FindDatFileSize(baseFileName) + if err != nil { + return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err) + } + + // write .dat file from .ec00 ~ .ec09 files + if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil { + return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) + } + + // write .idx file from .ecx and .ecj files + if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil { + return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err) + } + + return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil +} diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 1bf61e1c7..c26d6ed8f 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { @@ -101,7 +102,7 @@ type VolumeFileScanner4Tailing struct { lastProcessedTimestampNs uint64 } -func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error { +func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock super_block.SuperBlock) error { return nil } |
