aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-06-19 22:57:14 -0700
committerChris Lu <chris.lu@gmail.com>2019-06-19 22:57:14 -0700
commit856da7aae2adefe7c25f68c792f9ed03977a4a0e (patch)
tree6a9b8dafc533d4e8f2243fb80427aee807ca6b86
parent115558e5f529f119ca4ba60a45a28fc7bb8d25ae (diff)
downloadseaweedfs-856da7aae2adefe7c25f68c792f9ed03977a4a0e.tar.xz
seaweedfs-856da7aae2adefe7c25f68c792f9ed03977a4a0e.zip
ec volume support deletes
-rw-r--r--weed/server/volume_grpc_copy.go16
-rw-r--r--weed/server/volume_grpc_erasure_coding.go17
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go6
-rw-r--r--weed/storage/erasure_coding/ec_volume.go38
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go98
-rw-r--r--weed/storage/store_ec.go4
6 files changed, 162 insertions, 17 deletions
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 89788d3c3..1ec69b43f 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -55,11 +55,11 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// println("source:", volFileInfoResp.String())
// copy ecx file
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx"); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false); err != nil {
return err
}
- if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat"); err != nil {
+ if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false); err != nil {
return err
}
@@ -95,7 +95,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32,
- compactRevision uint32, stopOffset uint64, baseFileName, ext string) error {
+ compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool) error {
copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -109,7 +109,7 @@ func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.
return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond))
+ err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
if err != nil {
return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
@@ -143,9 +143,13 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler) error {
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
glog.V(4).Infof("writing to %s", fileName)
- dst, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ flags := os.O_WRONLY|os.O_CREATE|os.O_TRUNC
+ if isAppend {
+ flags = os.O_WRONLY|os.O_CREATE
+ }
+ dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
return nil
}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index ab1310c4a..e676337e6 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -74,6 +74,11 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
} else {
rebuiltShardIds = generatedShardIds
}
+
+ if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ }
+
break
}
}
@@ -97,7 +102,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId))); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false); err != nil {
return err
}
}
@@ -107,7 +112,12 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
}
// copy ecx file
- if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx"); err != nil {
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false); err != nil {
+ return err
+ }
+
+ // copy ecj file
+ if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true); err != nil {
return err
}
@@ -166,6 +176,9 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
if err := os.Remove(baseFilename + ".ecx"); err != nil {
return nil, err
}
+ if err := os.Remove(baseFilename + ".ecj"); err != nil {
+ return nil, err
+ }
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 26130b4ba..97010a1ed 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -32,7 +32,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
- return fmt.Errorf("failed to open dat file: %v", err)
+ return fmt.Errorf("failed to open ecx file: %v", err)
}
defer ecxFile.Close()
@@ -43,7 +43,7 @@ func WriteSortedEcxFile(baseFileName string) (e error) {
})
if err != nil {
- return fmt.Errorf("failed to open dat file: %v", err)
+ return fmt.Errorf("failed to visit ecx file: %v", err)
}
return nil
@@ -202,7 +202,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
outputs, err := openEcFiles(baseFileName, false)
defer closeEcFiles(outputs)
if err != nil {
- return fmt.Errorf("failed to open dat file: %v", err)
+ return fmt.Errorf("failed to open ec files %s: %v", baseFileName, err)
}
for remainingSize > largeBlockSize*DataShardsCount {
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index aea53f36e..acc9b1c37 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -1,6 +1,7 @@
package erasure_coding
import (
+ "errors"
"fmt"
"math"
"os"
@@ -14,6 +15,10 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
+var (
+ NotFoundError = errors.New("needle not found")
+)
+
type EcVolume struct {
VolumeId needle.VolumeId
Collection string
@@ -26,6 +31,8 @@ type EcVolume struct {
ShardLocationsRefreshTime time.Time
ShardLocationsLock sync.RWMutex
Version needle.Version
+ ecjFile *os.File
+ ecjFileAccessLock sync.Mutex
}
func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
@@ -34,8 +41,8 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
baseFileName := EcShardFileName(collection, dir, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644); err != nil {
- return nil, fmt.Errorf("cannot read ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
@@ -44,6 +51,11 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
+ // open ecj file
+ if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ }
+
ev.ShardLocations = make(map[ShardId][]string)
return
@@ -93,6 +105,12 @@ func (ev *EcVolume) Close() {
for _, s := range ev.Shards {
s.Close()
}
+ if ev.ecjFile != nil {
+ ev.ecjFileAccessLock.Lock()
+ _ = ev.ecjFile.Close()
+ ev.ecjFile = nil
+ ev.ecjFileAccessLock.Unlock()
+ }
if ev.ecxFile != nil {
_ = ev.ecxFile.Close()
ev.ecxFile = nil
@@ -107,6 +125,7 @@ func (ev *EcVolume) Destroy() {
s.Destroy()
}
os.Remove(ev.FileName() + ".ecx")
+ os.Remove(ev.FileName() + ".ecj")
}
func (ev *EcVolume) FileName() string {
@@ -167,16 +186,23 @@ func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle, version needle.Version
}
func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+ return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
+}
+
+func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
- l, h := int64(0), ev.ecxFileSize/types.NeedleMapEntrySize
+ l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
for l < h {
m := (l + h) / 2
- if _, err := ev.ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
- return types.Offset{}, 0, fmt.Errorf("ecx file %d read at %d: %v", ev.ecxFileSize, m*types.NeedleMapEntrySize, err)
+ if _, err := ecxFile.ReadAt(buf, m*types.NeedleMapEntrySize); err != nil {
+ return types.Offset{}, types.TombstoneFileSize, fmt.Errorf("ecx file %d read at %d: %v", ecxFileSize, m*types.NeedleMapEntrySize, err)
}
key, offset, size = idx.IdxFileEntry(buf)
if key == needleId {
+ if processNeedleFn != nil {
+ err = processNeedleFn(ecxFile, m*types.NeedleHeaderSize)
+ }
return
}
if key < needleId {
@@ -186,6 +212,6 @@ func (ev *EcVolume) findNeedleFromEcx(needleId types.NeedleId) (offset types.Off
}
}
- err = fmt.Errorf("needle id %d not found", needleId)
+ err = NotFoundError
return
}
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
new file mode 100644
index 000000000..784dc2854
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -0,0 +1,98 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ markNeedleDeleted = func(file *os.File, offset int64) error {
+ b := make([]byte, types.SizeSize)
+ util.Uint32toBytes(b, types.TombstoneFileSize)
+ n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
+ if err != nil {
+ return fmt.Errorf("ecx write error: %v", err)
+ }
+ if n != types.SizeSize {
+ return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
+ }
+ return nil
+ }
+)
+
+func (ev *EcVolume) deleteNeedleFromEcx(needleId types.NeedleId) (err error) {
+
+ _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
+
+ if err != nil {
+ if err == NotFoundError {
+ return nil
+ }
+ return err
+ }
+
+ b := make([]byte, types.NeedleIdSize)
+ types.NeedleIdToBytes(b, needleId)
+
+ ev.ecjFileAccessLock.Lock()
+
+ ev.ecjFile.Seek(0, io.SeekEnd)
+ ev.ecjFile.Write(b)
+
+ ev.ecjFileAccessLock.Unlock()
+
+ return
+}
+
+func RebuildEcxFile(baseFileName string) error {
+
+ if !util.FileExists(baseFileName + ".ecj") {
+ return nil
+ }
+
+ ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644)
+ if err != nil {
+ return fmt.Errorf("rebuild: failed to open ecx file: %v", err)
+ }
+ defer ecxFile.Close()
+
+ fstat, err := ecxFile.Stat()
+ if err != nil {
+ return err
+ }
+
+ ecxFileSize := fstat.Size()
+
+ ecjFile, err := os.OpenFile(baseFileName+".ecj", os.O_RDWR, 0644)
+ if err != nil {
+ return fmt.Errorf("rebuild: failed to open ecj file: %v", err)
+ }
+
+ buf := make([]byte, types.NeedleIdSize)
+ for {
+ n, _ := ecjFile.Read(buf)
+ if n != types.NeedleIdSize {
+ break
+ }
+
+ needleId := types.BytesToNeedleId(buf)
+
+ _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
+
+ if err != nil && err != NotFoundError {
+ ecxFile.Close()
+ return err
+ }
+
+ }
+
+ ecxFile.Close()
+
+ os.Remove(baseFileName + ".ecj")
+
+ return nil
+}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index b39776dcf..96cef7169 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -15,6 +15,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/klauspost/reedsolomon"
)
@@ -130,6 +131,9 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
+ if size == types.TombstoneFileSize {
+ return 0, fmt.Errorf("entry %s is deleted", n.Id)
+ }
glog.V(4).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)