aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-12-23 12:48:20 -0800
committerChris Lu <chris.lu@gmail.com>2019-12-23 12:48:20 -0800
commit09ca936c78c1044412f47dc06caff1bf08273e60 (patch)
treec8402cc969ca53b841bfdcb6ae23d3955a20e5ec /weed/storage
parentdda5c6d3cb32550777f1baf5bd00794e10e56d84 (diff)
downloadseaweedfs-09ca936c78c1044412f47dc06caff1bf08273e60.tar.xz
seaweedfs-09ca936c78c1044412f47dc06caff1bf08273e60.zip
shell: add ec.decode command
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go198
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go9
-rw-r--r--weed/storage/store.go9
-rw-r--r--weed/storage/store_ec.go6
-rw-r--r--weed/storage/super_block/replica_placement.go (renamed from weed/storage/replica_placement.go)2
-rw-r--r--weed/storage/super_block/replica_placement_test.go (renamed from weed/storage/replica_placement_test.go)2
-rw-r--r--weed/storage/super_block/super_block.go69
-rw-r--r--weed/storage/super_block/super_block_read.go.go44
-rw-r--r--weed/storage/super_block/super_block_test.go (renamed from weed/storage/volume_super_block_test.go)4
-rw-r--r--weed/storage/volume.go9
-rw-r--r--weed/storage/volume_backup.go12
-rw-r--r--weed/storage/volume_info.go7
-rw-r--r--weed/storage/volume_loading.go5
-rw-r--r--weed/storage/volume_read_write.go3
-rw-r--r--weed/storage/volume_super_block.go103
-rw-r--r--weed/storage/volume_vacuum.go7
-rw-r--r--weed/storage/volume_vacuum_test.go5
18 files changed, 365 insertions, 131 deletions
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
new file mode 100644
index 000000000..ae77cee3f
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -0,0 +1,198 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// write .idx file from .ecx and .ecj files
+func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
+
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
+ }
+ defer idxFile.Close()
+
+ io.Copy(idxFile, ecxFile)
+
+ err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
+
+ bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
+ idxFile.Write(bytes)
+
+ return nil
+ })
+
+ return err
+}
+
+// FindDatFileSize calculate .dat file size from max offset entry
+// there may be extra deletions after that entry
+// but they are deletions anyway
+func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+
+ version, err := readEcVolumeVersion(baseFileName)
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ }
+
+ err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+
+ if size == types.TombstoneFileSize {
+ return nil
+ }
+
+ entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ if datSize < entryStopOffset {
+ datSize = entryStopOffset
+ }
+
+ return nil
+ })
+
+ return
+}
+
+func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
+
+ // find volume version
+ datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
+ if err != nil {
+ return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
+ }
+ datBackend := backend.NewDiskFile(datFile)
+
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
+ datBackend.Close()
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
+ }
+
+ return superBlock.Version, nil
+
+}
+
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ buf := make([]byte, types.NeedleMapEntrySize)
+ for {
+ n, err := ecxFile.Read(buf)
+ if n != types.NeedleMapEntrySize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ key, offset, size := idx.IdxFileEntry(buf)
+ if processNeedleFn != nil {
+ err = processNeedleFn(key, offset, size)
+ }
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ return nil
+ }
+ }
+
+}
+
+func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
+ ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecjFile.Close()
+
+ buf := make([]byte, types.NeedleIdSize)
+ for {
+ n, err := ecjFile.Read(buf)
+ if n != types.NeedleIdSize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ if processNeedleFn != nil {
+ err = processNeedleFn(types.BytesToNeedleId(buf))
+ }
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ }
+
+}
+
+// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
+func WriteDatFile(baseFileName string, datFileSize int64) error {
+
+ datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
+ }
+ defer datFile.Close()
+
+ inputFiles := make([]*os.File, DataShardsCount)
+
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ shardFileName := baseFileName + ToExt(shardId)
+ inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
+ if openErr != nil {
+ return openErr
+ }
+ defer inputFiles[shardId].Close()
+ }
+
+ for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
+ if w != ErasureCodingLargeBlockSize {
+ return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= ErasureCodingLargeBlockSize
+ }
+ }
+
+ for datFileSize > 0 {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ toRead := min(datFileSize, ErasureCodingSmallBlockSize)
+ w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
+ if w != toRead {
+ return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= toRead
+ }
+ }
+
+ return nil
+}
+
+func min(x, y int64) int64 {
+ if x > y {
+ return y
+ }
+ return x
+}
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 75369dc8d..eeb384b91 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -49,7 +49,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
return nil
}
-// WriteEcFiles generates .ec01 ~ .ec14 files
+// WriteEcFiles generates .ec00 ~ .ec13 files
func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
}
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index c9e85c662..bb1fd0bf8 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -81,6 +81,15 @@ func (b ShardBits) ShardIds() (ret []ShardId) {
return
}
+func (b ShardBits) ToUint32Slice() (ret []uint32) {
+ for i := uint32(0); i < TotalShardsCount; i++ {
+ if b.HasShardId(ShardId(i)) {
+ ret = append(ret, i)
+ }
+ }
+ return
+}
+
func (b ShardBits) ShardIdCount() (count int) {
for count = 0; b > 0; count++ {
b &= b - 1
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 1ef97bbea..512f72ceb 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -61,7 +62,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
- rt, e := NewReplicaPlacementFromString(replicaPlacement)
+ rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
@@ -102,7 +103,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
@@ -229,7 +230,7 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
err = fmt.Errorf("volume %d is read only", i)
return
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
_, size, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -246,7 +247,7 @@ func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32,
if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 208560b7e..4c160d2d8 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -8,6 +8,8 @@ import (
"sync"
"time"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -15,8 +17,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@@ -169,7 +171,7 @@ func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ec
interval := erasure_coding.Interval{
BlockIndex: 0,
InnerBlockOffset: 0,
- Size: _SuperBlockSize,
+ Size: super_block.SuperBlockSize,
IsLargeBlock: true, // it could be large block, but ok in this place
LargeBlockRowsCount: 0,
}
diff --git a/weed/storage/replica_placement.go b/weed/storage/super_block/replica_placement.go
index c1aca52eb..fcccbba7d 100644
--- a/weed/storage/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"errors"
diff --git a/weed/storage/replica_placement_test.go b/weed/storage/super_block/replica_placement_test.go
index 7968af7cb..7742ba548 100644
--- a/weed/storage/replica_placement_test.go
+++ b/weed/storage/super_block/replica_placement_test.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"testing"
diff --git a/weed/storage/super_block/super_block.go b/weed/storage/super_block/super_block.go
new file mode 100644
index 000000000..f48cd0bdc
--- /dev/null
+++ b/weed/storage/super_block/super_block.go
@@ -0,0 +1,69 @@
+package super_block
+
+import (
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+/*
+* Super block currently has 8 bytes allocated for each volume.
+* Byte 0: version, 1 or 2
+* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Byte 4 and byte 5: The number of times the volume has been compacted.
+* Rest bytes: Reserved
+ */
+type SuperBlock struct {
+ Version needle.Version
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *needle.TTL
+ CompactionRevision uint16
+ Extra *master_pb.SuperBlockExtra
+ ExtraSize uint16
+}
+
+func (s *SuperBlock) BlockSize() int {
+ switch s.Version {
+ case needle.Version2, needle.Version3:
+ return SuperBlockSize + int(s.ExtraSize)
+ }
+ return SuperBlockSize
+}
+
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.Version)
+ header[1] = s.ReplicaPlacement.Byte()
+ s.Ttl.ToBytes(header[2:4])
+ util.Uint16toBytes(header[4:6], s.CompactionRevision)
+
+ if s.Extra != nil {
+ extraData, err := proto.Marshal(s.Extra)
+ if err != nil {
+ glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
+ }
+ extraSize := len(extraData)
+ if extraSize > 256*256-2 {
+ // reserve a couple of bits for future extension
+ glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
+ }
+ s.ExtraSize = uint16(extraSize)
+ util.Uint16toBytes(header[6:8], s.ExtraSize)
+
+ header = append(header, extraData...)
+ }
+
+ return header
+}
+
+func (s *SuperBlock) Initialized() bool {
+ return s.ReplicaPlacement != nil && s.Ttl != nil
+}
diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go
new file mode 100644
index 000000000..9eb12e116
--- /dev/null
+++ b/weed/storage/super_block/super_block_read.go.go
@@ -0,0 +1,44 @@
+package super_block
+
+import (
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// ReadSuperBlock reads from data file and load it into volume's super block
+func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
+
+ header := make([]byte, SuperBlockSize)
+ if _, e := datBackend.ReadAt(header, 0); e != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
+ return
+ }
+
+ superBlock.Version = needle.Version(header[0])
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err.Error())
+ return
+ }
+ superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
+ superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
+ superBlock.ExtraSize = util.BytesToUint16(header[6:8])
+
+ if superBlock.ExtraSize > 0 {
+ // read more
+ extraData := make([]byte, int(superBlock.ExtraSize))
+ superBlock.Extra = &master_pb.SuperBlockExtra{}
+ err = proto.Unmarshal(extraData, superBlock.Extra)
+ if err != nil {
+ err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err)
+ return
+ }
+ }
+
+ return
+}
diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/super_block/super_block_test.go
index 06ad8a5d3..25699070d 100644
--- a/weed/storage/volume_super_block_test.go
+++ b/weed/storage/super_block/super_block_test.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"testing"
@@ -10,7 +10,7 @@ func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
ttl, _ := needle.ReadTTL("15d")
s := &SuperBlock{
- version: needle.CurrentVersion,
+ Version: needle.CurrentVersion,
ReplicaPlacement: rp,
Ttl: ttl,
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index c92822e5c..eaeed4e77 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -28,7 +29,7 @@ type Volume struct {
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
MemoryMapMaxSizeMb uint32
- SuperBlock
+ super_block.SuperBlock
dataFileAccessLock sync.RWMutex
lastModifiedTsSeconds uint64 //unix time in seconds
@@ -42,10 +43,10 @@ type Volume struct {
volumeTierInfo *volume_server_pb.VolumeTierInfo
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
+ v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
return
@@ -68,7 +69,7 @@ func (v *Volume) FileName() (fileName string) {
}
func (v *Volume) Version() needle.Version {
- return v.SuperBlock.Version()
+ return v.SuperBlock.Version
}
func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) {
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index efe1345c6..ec29c895e 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -6,12 +6,14 @@ import (
"io"
"os"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "google.golang.org/grpc"
)
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
@@ -108,7 +110,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
- return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
+ return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -154,11 +156,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
- _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
@@ -244,7 +246,7 @@ type VolumeFileScanner4GenIdx struct {
v *Volume
}
-func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
+func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index 3b9b667eb..313818cde 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -6,12 +6,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
type VolumeInfo struct {
Id needle.VolumeId
Size uint64
- ReplicaPlacement *ReplicaPlacement
+ ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
Collection string
Version needle.Version
@@ -40,7 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
}
- rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
+ rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
@@ -55,7 +56,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
Collection: m.Collection,
Version: needle.Version(m.Version),
}
- rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
+ rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 5c1ac172a..eb177e850 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -11,11 +11,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{}
+ v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
err = v.load(false, false, needleMapKind, 0)
return
@@ -43,7 +44,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
- if fileSize >= _SuperBlockSize {
+ if fileSize >= super_block.SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 4eb04a13b..a08e64077 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -165,7 +166,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
}
type VolumeFileScanner interface {
- VisitSuperBlock(SuperBlock) error
+ VisitSuperBlock(super_block.SuperBlock) error
ReadNeedleBody() bool
VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 519691260..61c09d85a 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -1,79 +1,14 @@
package storage
import (
- "fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
-const (
- _SuperBlockSize = 8
-)
-
-/*
-* Super block currently has 8 bytes allocated for each volume.
-* Byte 0: version, 1 or 2
-* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live. See TTL for definition
-* Byte 4 and byte 5: The number of times the volume has been compacted.
-* Rest bytes: Reserved
- */
-type SuperBlock struct {
- version needle.Version
- ReplicaPlacement *ReplicaPlacement
- Ttl *needle.TTL
- CompactionRevision uint16
- Extra *master_pb.SuperBlockExtra
- extraSize uint16
-}
-
-func (s *SuperBlock) BlockSize() int {
- switch s.version {
- case needle.Version2, needle.Version3:
- return _SuperBlockSize + int(s.extraSize)
- }
- return _SuperBlockSize
-}
-
-func (s *SuperBlock) Version() needle.Version {
- return s.version
-}
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, _SuperBlockSize)
- header[0] = byte(s.version)
- header[1] = s.ReplicaPlacement.Byte()
- s.Ttl.ToBytes(header[2:4])
- util.Uint16toBytes(header[4:6], s.CompactionRevision)
-
- if s.Extra != nil {
- extraData, err := proto.Marshal(s.Extra)
- if err != nil {
- glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
- }
- extraSize := len(extraData)
- if extraSize > 256*256-2 {
- // reserve a couple of bits for future extension
- glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
- }
- s.extraSize = uint16(extraSize)
- util.Uint16toBytes(header[6:8], s.extraSize)
-
- header = append(header, extraData...)
- }
-
- return header
-}
-
-func (s *SuperBlock) Initialized() bool {
- return s.ReplicaPlacement != nil && s.Ttl != nil
-}
-
func (v *Volume) maybeWriteSuperBlock() error {
datSize, _, e := v.DataBackend.GetStat()
@@ -82,7 +17,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
return e
}
if datSize == 0 {
- v.SuperBlock.version = needle.CurrentVersion
+ v.SuperBlock.Version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
@@ -100,38 +35,6 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
func (v *Volume) readSuperBlock() (err error) {
- v.SuperBlock, err = ReadSuperBlock(v.DataBackend)
+ v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend)
return err
}
-
-// ReadSuperBlock reads from data file and load it into volume's super block
-func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
-
- header := make([]byte, _SuperBlockSize)
- if _, e := datBackend.ReadAt(header, 0); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
- return
- }
-
- superBlock.version = needle.Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- return
- }
- superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
- superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
- superBlock.extraSize = util.BytesToUint16(header[6:8])
-
- if superBlock.extraSize > 0 {
- // read more
- extraData := make([]byte, int(superBlock.extraSize))
- superBlock.Extra = &master_pb.SuperBlockExtra{}
- err = proto.Unmarshal(extraData, superBlock.Extra)
- if err != nil {
- err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err)
- return
- }
- }
-
- return
-}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index caa1777e4..cb51a449c 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -11,6 +11,7 @@ import (
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -133,7 +134,7 @@ func (v *Volume) cleanupCompact() error {
}
func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (compactRevision uint16, err error) {
- superBlock, err := ReadSuperBlock(datBackend)
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
return 0, err
}
@@ -277,8 +278,8 @@ type VolumeFileScanner4Vacuum struct {
writeThrottler *util.WriteThrottler
}
-func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
superBlock.CompactionRevision++
_, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0)
scanner.newOffset = int64(superBlock.BlockSize())
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index ba1e59f2c..07b9f70c1 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -46,7 +47,7 @@ func TestMakeDiff(t *testing.T) {
v := new(Volume)
//lastCompactIndexOffset value is the index file size before step 4
v.lastCompactIndexOffset = 96
- v.SuperBlock.version = 0x2
+ v.SuperBlock.Version = 0x2
/*
err := v.makeupDiff(
"/yourpath/1.cpd",
@@ -68,7 +69,7 @@ func TestCompaction(t *testing.T) {
}
defer os.RemoveAll(dir) // clean up
- v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}