aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/disk_location.go24
-rw-r--r--weed/storage/needle/needle_read_write.go7
-rw-r--r--weed/storage/store.go40
-rw-r--r--weed/storage/volume_checking.go84
-rw-r--r--weed/storage/volume_loading.go2
-rw-r--r--weed/storage/volume_read_write.go23
6 files changed, 145 insertions, 35 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index ed57aa54b..775ebf092 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -13,14 +13,16 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type DiskLocation struct {
- Directory string
- MaxVolumeCount int
- MinFreeSpacePercent float32
- volumes map[needle.VolumeId]*Volume
- volumesLock sync.RWMutex
+ Directory string
+ MaxVolumeCount int
+ OriginalMaxVolumeCount int
+ MinFreeSpacePercent float32
+ volumes map[needle.VolumeId]*Volume
+ volumesLock sync.RWMutex
// erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
@@ -30,7 +32,7 @@ type DiskLocation struct {
}
func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation {
- location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent}
+ location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent}
location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
go location.CheckDiskSpace()
@@ -60,6 +62,14 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
+ name := name[:len(name)-len(".idx")]
+ noteFile := l.Directory + "/" + name + ".note"
+ if util.FileExists(noteFile) {
+ note, _ := ioutil.ReadFile(noteFile)
+ glog.Warningf("volume %s was not completed: %s", name, string(note))
+ removeVolumeFiles(l.Directory + "/" + name)
+ return false
+ }
vid, collection, err := l.volumeIdFromPath(fileInfo)
if err != nil {
glog.Warningf("get volume id failed, %s, err : %s", name, err)
@@ -85,7 +95,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
size, _, _ := v.FileStat()
glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
return true
}
return false
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 89fc85b0d..e758a6fee 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -24,6 +24,8 @@ const (
TtlBytesLength = 2
)
+var ErrorSizeMismatch = errors.New("size mismatch")
+
func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
@@ -168,6 +170,11 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi
func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
n.ParseNeedleHeader(bytes)
if n.Size != size {
+ // cookie is not always passed in for this API. Use size to do preliminary checking.
+ if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
+ glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
+ return ErrorSizeMismatch
+ }
return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
}
switch version {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index b9fcfcba9..38f167cef 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -194,13 +194,19 @@ func (s *Store) SetDataCenter(dataCenter string) {
func (s *Store) SetRack(rack string) {
s.rack = rack
}
+func (s *Store) GetDataCenter() string {
+ return s.dataCenter
+}
+func (s *Store) GetRack() string {
+ return s.rack
+}
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
maxVolumeCount := 0
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
- collectionVolumeReadOnlyCount := make(map[string]uint8)
+ collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
@@ -220,11 +226,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
}
}
collectionVolumeSize[v.Collection] += volumeMessage.Size
+ if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
+ collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
+ "IsReadOnly": 0,
+ "noWriteOrDelete": 0,
+ "noWriteCanDelete": 0,
+ "isDiskSpaceLow": 0,
+ }
+ }
if v.IsReadOnly() {
- collectionVolumeReadOnlyCount[v.Collection] += 1
- } else {
- if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
- collectionVolumeReadOnlyCount[v.Collection] = 0
+ collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
+ if v.noWriteOrDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
+ }
+ if v.noWriteCanDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1
+ }
+ if v.location.isDiskSpaceLow {
+ collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1
}
}
}
@@ -251,8 +270,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
- for col, count := range collectionVolumeReadOnlyCount {
- stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count))
+ for col, types := range collectionVolumeReadOnlyCount {
+ for t, count := range types {
+ stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count))
+ }
}
return &master_pb.Heartbeat{
@@ -440,7 +461,8 @@ func (s *Store) GetVolumeSizeLimit() uint64 {
func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
volumeSizeLimit := s.GetVolumeSizeLimit()
for _, diskLocation := range s.Locations {
- if diskLocation.MaxVolumeCount == 0 {
+ if diskLocation.OriginalMaxVolumeCount == 0 {
+ currentMaxVolumeCount := diskLocation.MaxVolumeCount
diskStatus := stats.NewDiskStatus(diskLocation.Directory)
unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
@@ -452,7 +474,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
diskLocation.MaxVolumeCount = maxVolumeCount
glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
- hasChanges = true
+ hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount
}
}
return
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index e42fb238b..00e04047f 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -2,8 +2,10 @@ package storage
import (
"fmt"
+ "io"
"os"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -11,17 +13,40 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
+func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
- if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
- return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
+ if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
+ return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
}
if indexSize == 0 {
return 0, nil
}
+ healthyIndexSize := indexSize
+ for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ {
+ // check and fix last 10 entries
+ lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
+ if err == io.EOF {
+ healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize
+ continue
+ }
+ if err != ErrorSizeMismatch {
+ break
+ }
+ }
+ if healthyIndexSize < indexSize {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
+ err = indexFile.Truncate(healthyIndexSize)
+ if err != nil {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
+ }
+ }
+ return
+}
+
+func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
var lastIdxEntry []byte
- if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
- return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
+ if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
+ return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
}
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
if offset.IsZero() {
@@ -29,15 +54,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin
}
if size < 0 {
// read the deletion entry
- if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
}
} else {
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil {
+ return lastAppendAtNs, err
}
}
- return
+ return lastAppendAtNs, nil
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
@@ -60,7 +85,44 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
}
func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
- n := new(needle.Needle)
+ n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
+ }
+ if n.Size != size {
+ return 0, ErrorSizeMismatch
+ }
+ if v == needle.Version3 {
+ bytes := make([]byte, TimestampSize)
+ _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
+ }
+ n.AppendAtNs = util.BytesToUint64(bytes)
+ fileTailOffset := offset + needle.GetActualSize(size, v)
+ fileSize, _, err := datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
+ }
+ if fileSize == fileTailOffset {
+ return n.AppendAtNs, nil
+ }
+ if fileSize > fileTailOffset {
+ glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ err = datFile.Truncate(fileTailOffset)
+ if err == nil {
+ return n.AppendAtNs, nil
+ }
+ return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
+ }
+ glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ }
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 73e2de02b..05684cbdb 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
}
}
- if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
+ if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 94c1d0ea1..869796a3f 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -17,6 +17,7 @@ import (
var ErrorNotFound = errors.New("not found")
var ErrorDeleted = errors.New("already deleted")
+var ErrorSizeMismatch = errors.New("size mismatch")
// isFileUnchanged checks whether this needle to write is same as last one.
// It requires serialized access in the same volume.
@@ -55,16 +56,21 @@ func (v *Volume) Destroy() (err error) {
}
}
v.Close()
- os.Remove(v.FileName() + ".dat")
- os.Remove(v.FileName() + ".idx")
- os.Remove(v.FileName() + ".vif")
- os.Remove(v.FileName() + ".sdx")
- os.Remove(v.FileName() + ".cpd")
- os.Remove(v.FileName() + ".cpx")
- os.RemoveAll(v.FileName() + ".ldb")
+ removeVolumeFiles(v.FileName())
return
}
+func removeVolumeFiles(filename string) {
+ os.Remove(filename + ".dat")
+ os.Remove(filename + ".idx")
+ os.Remove(filename + ".vif")
+ os.Remove(filename + ".sdx")
+ os.Remove(filename + ".cpd")
+ os.Remove(filename + ".cpx")
+ os.RemoveAll(filename + ".ldb")
+ os.Remove(filename + ".note")
+}
+
func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
v.asyncRequestsChan <- request
}
@@ -274,6 +280,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro
return 0, nil
}
err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
if err != nil {
return 0, err
}