diff options
Diffstat (limited to 'weed/storage')
| -rw-r--r-- | weed/storage/disk_location.go | 24 | ||||
| -rw-r--r-- | weed/storage/needle/needle_read_write.go | 7 | ||||
| -rw-r--r-- | weed/storage/store.go | 40 | ||||
| -rw-r--r-- | weed/storage/volume_checking.go | 84 | ||||
| -rw-r--r-- | weed/storage/volume_loading.go | 2 | ||||
| -rw-r--r-- | weed/storage/volume_read_write.go | 23 |
6 files changed, 145 insertions, 35 deletions
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index ed57aa54b..775ebf092 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -13,14 +13,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) type DiskLocation struct { - Directory string - MaxVolumeCount int - MinFreeSpacePercent float32 - volumes map[needle.VolumeId]*Volume - volumesLock sync.RWMutex + Directory string + MaxVolumeCount int + OriginalMaxVolumeCount int + MinFreeSpacePercent float32 + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume @@ -30,7 +32,7 @@ type DiskLocation struct { } func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent} + location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount, OriginalMaxVolumeCount: maxVolumeCount, MinFreeSpacePercent: minFreeSpacePercent} location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) go location.CheckDiskSpace() @@ -60,6 +62,14 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { name := fileInfo.Name() if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { + name := name[:len(name)-len(".idx")] + noteFile := l.Directory + "/" + name + ".note" + if util.FileExists(noteFile) { + note, _ := ioutil.ReadFile(noteFile) + glog.Warningf("volume %s was not completed: %s", name, string(note)) + removeVolumeFiles(l.Directory + "/" + name) + return false + } vid, collection, err := l.volumeIdFromPath(fileInfo) if err != nil { glog.Warningf("get volume id failed, %s, err : %s", name, err) @@ -85,7 +95,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne size, _, _ := v.FileStat() glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + l.Directory+"/"+name+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) return true } return false diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 89fc85b0d..e758a6fee 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -24,6 +24,8 @@ const ( TtlBytesLength = 2 ) +var ErrorSizeMismatch = errors.New("size mismatch") + func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } @@ -168,6 +170,11 @@ func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, versi func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { + // cookie is not always passed in for this API. Use size to do preliminary checking. + if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { + glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) + return ErrorSizeMismatch + } return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { diff --git a/weed/storage/store.go b/weed/storage/store.go index b9fcfcba9..38f167cef 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -194,13 +194,19 @@ func (s *Store) SetDataCenter(dataCenter string) { func (s *Store) SetRack(rack string) { s.rack = rack } +func (s *Store) GetDataCenter() string { + return s.dataCenter +} +func (s *Store) GetRack() string { + return s.rack +} func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage maxVolumeCount := 0 var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) - collectionVolumeReadOnlyCount := make(map[string]uint8) + collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId maxVolumeCount = maxVolumeCount + location.MaxVolumeCount @@ -220,11 +226,24 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { } } collectionVolumeSize[v.Collection] += volumeMessage.Size + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { + collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ + "IsReadOnly": 0, + "noWriteOrDelete": 0, + "noWriteCanDelete": 0, + "isDiskSpaceLow": 0, + } + } if v.IsReadOnly() { - collectionVolumeReadOnlyCount[v.Collection] += 1 - } else { - if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { - collectionVolumeReadOnlyCount[v.Collection] = 0 + collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 + if v.noWriteOrDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 + } + if v.noWriteCanDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 + } + if v.location.isDiskSpaceLow { + collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 } } } @@ -251,8 +270,10 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } - for col, count := range collectionVolumeReadOnlyCount { - stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, "normal").Set(float64(count)) + for col, types := range collectionVolumeReadOnlyCount { + for t, count := range types { + stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count)) + } } return &master_pb.Heartbeat{ @@ -440,7 +461,8 @@ func (s *Store) GetVolumeSizeLimit() uint64 { func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { volumeSizeLimit := s.GetVolumeSizeLimit() for _, diskLocation := range s.Locations { - if diskLocation.MaxVolumeCount == 0 { + if diskLocation.OriginalMaxVolumeCount == 0 { + currentMaxVolumeCount := diskLocation.MaxVolumeCount diskStatus := stats.NewDiskStatus(diskLocation.Directory) unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit) unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace) @@ -452,7 +474,7 @@ func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { diskLocation.MaxVolumeCount = maxVolumeCount glog.V(0).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB", diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024) - hasChanges = true + hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount } } return diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index e42fb238b..00e04047f 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -2,8 +2,10 @@ package storage import ( "fmt" + "io" "os" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -11,17 +13,40 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { +func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 - if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil { + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err) } if indexSize == 0 { return 0, nil } + healthyIndexSize := indexSize + for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ { + // check and fix last 10 entries + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + if err == io.EOF { + healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize + continue + } + if err != ErrorSizeMismatch { + break + } + } + if healthyIndexSize < indexSize { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize) + err = indexFile.Truncate(healthyIndexSize) + if err != nil { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err) + } + } + return +} + +func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) { var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil { + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err) } key, offset, size := idx.IdxFileEntry(lastIdxEntry) if offset.IsZero() { @@ -29,15 +54,15 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uin } if size < 0 { // read the deletion entry - if lastAppendAtNs, e = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) } } else { - if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); err != nil { + return lastAppendAtNs, err } } - return + return lastAppendAtNs, nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -60,7 +85,44 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err } func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { - n := new(needle.Needle) + n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) + } + if n.Size != size { + return 0, ErrorSizeMismatch + } + if v == needle.Version3 { + bytes := make([]byte, TimestampSize) + _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) + } + n.AppendAtNs = util.BytesToUint64(bytes) + fileTailOffset := offset + needle.GetActualSize(size, v) + fileSize, _, err := datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err) + } + if fileSize == fileTailOffset { + return n.AppendAtNs, nil + } + if fileSize > fileTailOffset { + glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset) + err = datFile.Truncate(fileTailOffset) + if err == nil { + return n.AppendAtNs, nil + } + return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err) + } + glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset) + } if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 73e2de02b..05684cbdb 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -89,7 +89,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) } } - if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { + if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { v.noWriteOrDelete = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 94c1d0ea1..869796a3f 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -17,6 +17,7 @@ import ( var ErrorNotFound = errors.New("not found") var ErrorDeleted = errors.New("already deleted") +var ErrorSizeMismatch = errors.New("size mismatch") // isFileUnchanged checks whether this needle to write is same as last one. // It requires serialized access in the same volume. @@ -55,16 +56,21 @@ func (v *Volume) Destroy() (err error) { } } v.Close() - os.Remove(v.FileName() + ".dat") - os.Remove(v.FileName() + ".idx") - os.Remove(v.FileName() + ".vif") - os.Remove(v.FileName() + ".sdx") - os.Remove(v.FileName() + ".cpd") - os.Remove(v.FileName() + ".cpx") - os.RemoveAll(v.FileName() + ".ldb") + removeVolumeFiles(v.FileName()) return } +func removeVolumeFiles(filename string) { + os.Remove(filename + ".dat") + os.Remove(filename + ".idx") + os.Remove(filename + ".vif") + os.Remove(filename + ".sdx") + os.Remove(filename + ".cpd") + os.Remove(filename + ".cpx") + os.RemoveAll(filename + ".ldb") + os.Remove(filename + ".note") +} + func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) { v.asyncRequestsChan <- request } @@ -274,6 +280,9 @@ func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, erro return 0, nil } err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } if err != nil { return 0, err } |
