diff options
| author | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
|---|---|---|
| committer | bingoohuang <bingoo.huang@gmail.com> | 2021-04-26 17:19:35 +0800 |
| commit | d861cbd81b75b6684c971ac00e33685e6575b833 (patch) | |
| tree | 301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/storage | |
| parent | 70da715d8d917527291b35fb069fac077d17b868 (diff) | |
| parent | 4ee58922eff61a5a4ca29c0b4829b097a498549e (diff) | |
| download | seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip | |
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/storage')
67 files changed, 2394 insertions, 994 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index 6ea850543..2dc61d02e 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -1,6 +1,7 @@ package backend import ( + "github.com/chrislusf/seaweedfs/weed/util" "io" "os" "strings" @@ -9,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/spf13/viper" ) type BackendStorageFile interface { @@ -19,6 +19,7 @@ type BackendStorageFile interface { io.Closer GetStat() (datSize int64, modTime time.Time, err error) Name() string + Sync() error } type BackendStorage interface { @@ -35,7 +36,7 @@ type StringProperties interface { type StorageType string type BackendStorageFactory interface { StorageType() StorageType - BuildStorage(configuration StringProperties, id string) (BackendStorage, error) + BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error) } var ( @@ -44,23 +45,24 @@ var ( ) // used by master to load remote storage configurations -func LoadConfiguration(config *viper.Viper) { +func LoadConfiguration(config *util.ViperProxy) { StorageBackendPrefix := "storage.backend" - backendSub := config.Sub(StorageBackendPrefix) - for backendTypeName := range config.GetStringMap(StorageBackendPrefix) { backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)] if !found { glog.Fatalf("backend storage type %s not found", backendTypeName) } - backendTypeSub := backendSub.Sub(backendTypeName) - for backendStorageId := range backendSub.GetStringMap(backendTypeName) { - if !backendTypeSub.GetBool(backendStorageId + ".enabled") { + for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) { + if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { + continue + } + if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found { continue } - backendStorage, buildErr := backendStorageFactory.BuildStorage(backendTypeSub.Sub(backendStorageId), backendStorageId) + backendStorage, buildErr := backendStorageFactory.BuildStorage(config, + StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId) } @@ -82,7 +84,10 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { glog.Warningf("storage type %s not found", storageBackend.Type) continue } - backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), storageBackend.Id) + if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found { + continue + } + backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) if buildErr != nil { glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) } diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index c4b3caffb..3b42429cf 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -1,6 +1,8 @@ package backend import ( + "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "time" ) @@ -12,12 +14,25 @@ var ( type DiskFile struct { File *os.File fullFilePath string + fileSize int64 + modTime time.Time } func NewDiskFile(f *os.File) *DiskFile { + stat, err := f.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", f.Name(), err) + } + offset := stat.Size() + if offset%NeedlePaddingSize != 0 { + offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize) + } + return &DiskFile{ fullFilePath: f.Name(), File: f, + fileSize: offset, + modTime: stat.ModTime(), } } @@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) { } func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) { - return df.File.WriteAt(p, off) + n, err = df.File.WriteAt(p, off) + if err == nil { + waterMark := off + int64(n) + if waterMark > df.fileSize { + df.fileSize = waterMark + df.modTime = time.Now() + } + } + return +} + +func (df *DiskFile) Write(p []byte) (n int, err error) { + return df.WriteAt(p, df.fileSize) } func (df *DiskFile) Truncate(off int64) error { - return df.File.Truncate(off) + err := df.File.Truncate(off) + if err == nil { + df.fileSize = off + df.modTime = time.Now() + } + return err } func (df *DiskFile) Close() error { @@ -38,13 +70,13 @@ func (df *DiskFile) Close() error { } func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { - stat, e := df.File.Stat() - if e == nil { - return stat.Size(), stat.ModTime(), nil - } - return 0, time.Time{}, err + return df.fileSize, df.modTime, nil } func (df *DiskFile) Name() string { return df.fullFilePath } + +func (df *DiskFile) Sync() error { + return df.File.Sync() +} diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go index 03e7308d0..8ff03d9af 100644 --- a/weed/storage/backend/memory_map/memory_map_backend.go +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -3,12 +3,10 @@ package memory_map import ( "os" "time" - - "github.com/chrislusf/seaweedfs/weed/storage/backend" ) var ( - _ backend.BackendStorageFile = &MemoryMappedFile{} +// _ backend.BackendStorageFile = &MemoryMappedFile{} // remove this to break import cycle ) type MemoryMappedFile struct { @@ -58,3 +56,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er func (mmf *MemoryMappedFile) Name() string { return mmf.mm.File.Name() } + +func (mm *MemoryMappedFile) Sync() error { + return nil +} diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 9f03cfa81..4706c9334 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -26,8 +26,8 @@ type S3BackendFactory struct { func (factory *S3BackendFactory) StorageType() backend.StorageType { return backend.StorageType("s3") } -func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, id string) (backend.BackendStorage, error) { - return newS3BackendStorage(configuration, id) +func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, configPrefix, id) } type S3BackendStorage struct { @@ -36,17 +36,20 @@ type S3BackendStorage struct { aws_secret_access_key string region string bucket string + endpoint string conn s3iface.S3API } -func newS3BackendStorage(configuration backend.StringProperties, id string) (s *S3BackendStorage, err error) { +func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) { s = &S3BackendStorage{} s.id = id - s.aws_access_key_id = configuration.GetString("aws_access_key_id") - s.aws_secret_access_key = configuration.GetString("aws_secret_access_key") - s.region = configuration.GetString("region") - s.bucket = configuration.GetString("bucket") - s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region) + s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id") + s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key") + s.region = configuration.GetString(configPrefix + "region") + s.bucket = configuration.GetString(configPrefix + "bucket") + s.endpoint = configuration.GetString(configPrefix + "endpoint") + + s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region, s.endpoint) glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket) return @@ -58,6 +61,7 @@ func (s *S3BackendStorage) ToProperties() map[string]string { m["aws_secret_access_key"] = s.aws_secret_access_key m["region"] = s.region m["bucket"] = s.bucket + m["endpoint"] = s.endpoint return m } @@ -175,3 +179,7 @@ func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTi func (s3backendStorageFile S3BackendStorageFile) Name() string { return s3backendStorageFile.key } + +func (s3backendStorageFile S3BackendStorageFile) Sync() error { + return nil +} diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go index 5fdbcb66b..b8378c379 100644 --- a/weed/storage/backend/s3_backend/s3_sessions.go +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -24,7 +24,7 @@ func getSession(region string) (s3iface.S3API, bool) { return sess, found } -func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S3API, error) { +func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string) (s3iface.S3API, error) { sessionsLock.Lock() defer sessionsLock.Unlock() @@ -34,7 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S } config := &aws.Config{ - Region: aws.String(region), + Region: aws.String(region), + Endpoint: aws.String(endpoint), + S3ForcePathStyle: aws.Bool(true), } if awsAccessKeyId != "" && awsSecretAccessKey != "" { config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "") diff --git a/weed/storage/backend/volume_create.go b/weed/storage/backend/volume_create.go new file mode 100644 index 000000000..d4bd8e40f --- /dev/null +++ b/weed/storage/backend/volume_create.go @@ -0,0 +1,20 @@ +// +build !linux,!windows + +package backend + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) { + file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if e != nil { + return nil, e + } + if preallocate > 0 { + glog.V(2).Infof("Preallocated disk space for %s is not supported", fileName) + } + return NewDiskFile(file), nil +} diff --git a/weed/storage/volume_create_linux.go b/weed/storage/backend/volume_create_linux.go index ee599ac32..260c2c2a3 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/backend/volume_create_linux.go @@ -1,23 +1,22 @@ // +build linux -package storage +package backend import ( "os" "syscall" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { +func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) if e != nil { return nil, e } if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) - glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) + glog.V(1).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) } - return backend.NewDiskFile(file), nil + return NewDiskFile(file), nil } diff --git a/weed/storage/volume_create_windows.go b/weed/storage/backend/volume_create_windows.go index e1c0b961f..7d40ec0d7 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/backend/volume_create_windows.go @@ -1,17 +1,16 @@ // +build windows -package storage +package backend import ( "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map" "golang.org/x/sys/windows" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { +func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) { if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } @@ -27,7 +26,7 @@ func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32 if e != nil { return nil, e } - return backend.NewDiskFile(file), nil + return NewDiskFile(file), nil } } diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index e116fc715..ed4e00312 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -1,45 +1,68 @@ package storage import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "os" + "path/filepath" "strings" "sync" - - "fmt" + "time" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) type DiskLocation struct { - Directory string - MaxVolumeCount int - volumes map[needle.VolumeId]*Volume - volumesLock sync.RWMutex + Directory string + IdxDirectory string + DiskType types.DiskType + MaxVolumeCount int + OriginalMaxVolumeCount int + MinFreeSpacePercent float32 + volumes map[needle.VolumeId]*Volume + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume ecVolumesLock sync.RWMutex + + isDiskSpaceLow bool } -func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { - location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount} +func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation { + dir = util.ResolvePath(dir) + if idxDir == "" { + idxDir = dir + } else { + idxDir = util.ResolvePath(idxDir) + } + location := &DiskLocation{ + Directory: dir, + IdxDirectory: idxDir, + DiskType: diskType, + MaxVolumeCount: maxVolumeCount, + OriginalMaxVolumeCount: maxVolumeCount, + MinFreeSpacePercent: minFreeSpacePercent, + } location.volumes = make(map[needle.VolumeId]*Volume) location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume) + go location.CheckDiskSpace() return location } -func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { - name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".idx") { - base := name[:len(name)-len(".idx")] +func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) { + if isValidVolume(filename) { + base := filename[:len(filename)-4] collection, volumeId, err := parseCollectionVolumeId(base) return volumeId, collection, err } - return 0, "", fmt.Errorf("Path is not a volume: %s", name) + return 0, "", fmt.Errorf("file is not a volume: %s", filename) } func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) { @@ -51,38 +74,83 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI return collection, vol, err } -func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) { - name := fileInfo.Name() - if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { - vid, collection, err := l.volumeIdFromPath(fileInfo) - if err == nil { - l.volumesLock.RLock() - _, found := l.volumes[vid] - l.volumesLock.RUnlock() - if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { - l.volumesLock.Lock() - l.volumes[vid] = v - l.volumesLock.Unlock() - size, _, _ := v.FileStat() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) - // println("volume", vid, "last append at", v.lastAppendAtNs) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } - } - } +func isValidVolume(basename string) bool { + return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif") +} + +func getValidVolumeName(basename string) string { + if isValidVolume(basename) { + return basename[:len(basename)-4] } + return "" } -func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool { + basename := fileInfo.Name() + if fileInfo.IsDir() { + return false + } + volumeName := getValidVolumeName(basename) + if volumeName == "" { + return false + } + + // check for incomplete volume + noteFile := l.Directory + "/" + volumeName + ".note" + if util.FileExists(noteFile) { + note, _ := ioutil.ReadFile(noteFile) + glog.Warningf("volume %s was not completed: %s", volumeName, string(note)) + removeVolumeFiles(l.Directory + "/" + volumeName) + removeVolumeFiles(l.IdxDirectory + "/" + volumeName) + return false + } + + // parse out collection, volume id + vid, collection, err := volumeIdFromFileName(basename) + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", volumeName, err) + return false + } + + // avoid loading one volume more than once + l.volumesLock.RLock() + _, found := l.volumes[vid] + l.volumesLock.RUnlock() + if found { + glog.V(1).Infof("loaded volume, %v", vid) + return true + } + + // load the volume + v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0) + if e != nil { + glog.V(0).Infof("new volume %s error %s", volumeName, e) + return false + } + + l.SetVolume(vid, v) + + size, _, _ := v.FileStat() + glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s", + l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + return true +} + +func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) { task_queue := make(chan os.FileInfo, 10*concurrency) go func() { - if dirs, err := ioutil.ReadDir(l.Directory); err == nil { - for _, dir := range dirs { - task_queue <- dir + foundVolumeNames := make(map[string]bool) + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fi := range fileInfos { + volumeName := getValidVolumeName(fi.Name()) + if volumeName == "" { + continue + } + if _, found := foundVolumeNames[volumeName]; !found { + foundVolumeNames[volumeName] = true + task_queue <- fi + } } } close(task_queue) @@ -93,8 +161,8 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con wg.Add(1) go func() { defer wg.Done() - for dir := range task_queue { - l.loadExistingVolume(dir, needleMapKind) + for fi := range task_queue { + _ = l.loadExistingVolume(fi, needleMapKind) } }() } @@ -102,7 +170,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con } -func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { +func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) { l.concurrentLoadingVolumes(needleMapKind, 10) glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount) @@ -158,7 +226,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er return } -func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { +func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) { v, ok := l.volumes[vid] if !ok { return @@ -167,21 +235,15 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { if e != nil { return } + found = true delete(l.volumes, vid) return } -func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { - if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { - for _, fileInfo := range fileInfos { - volId, _, err := l.volumeIdFromPath(fileInfo) - if vid == volId && err == nil { - l.loadExistingVolume(fileInfo, needleMapKind) - return true - } - } +func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool { + if fileInfo, found := l.LocateVolume(vid); found { + return l.loadExistingVolume(fileInfo, needleMapKind) } - return false } @@ -193,7 +255,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { if !ok { return fmt.Errorf("Volume not found, VolumeId: %d", vid) } - return l.deleteVolumeById(vid) + _, err := l.deleteVolumeById(vid) + return err } func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { @@ -217,7 +280,7 @@ func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[need } } - for k, _ := range deltaVols { + for k := range deltaVols { delete(l.volumes, k) } return deltaVols @@ -228,6 +291,7 @@ func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { defer l.volumesLock.Unlock() l.volumes[vid] = volume + volume.location = l } func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { @@ -260,3 +324,53 @@ func (l *DiskLocation) Close() { return } + +func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) { + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fileInfo := range fileInfos { + volId, _, err := volumeIdFromFileName(fileInfo.Name()) + if vid == volId && err == nil { + return fileInfo, true + } + } + } + + return nil, false +} + +func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) { + + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() + + for _, vol := range l.volumes { + if vol.IsReadOnly() { + continue + } + datSize, idxSize, _ := vol.FileStat() + unUsedSpace += volumeSizeLimit - (datSize + idxSize) + } + + return +} + +func (l *DiskLocation) CheckDiskSpace() { + for { + if dir, e := filepath.Abs(l.Directory); e == nil { + s := stats.NewDiskStatus(dir) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All)) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used)) + stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free)) + if (s.PercentFree < l.MinFreeSpacePercent) != l.isDiskSpaceLow { + l.isDiskSpaceLow = !l.isDiskSpaceLow + } + if l.isDiskSpaceLow { + glog.V(0).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow) + } else { + glog.V(4).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow) + } + } + time.Sleep(time.Minute) + } + +} diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index f6c44e966..91c7d86a6 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "io/ioutil" + "os" "path" "regexp" "sort" @@ -13,7 +14,7 @@ import ( ) var ( - re = regexp.MustCompile("\\.ec[0-9][0-9]") + re = regexp.MustCompile(`\.ec[0-9][0-9]`) ) func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) { @@ -56,15 +57,18 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) { - ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId) + ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId) if err != nil { + if err == os.ErrNotExist { + return os.ErrNotExist + } return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err) } l.ecVolumesLock.Lock() defer l.ecVolumesLock.Unlock() ecVolume, found := l.ecVolumes[vid] if !found { - ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid) + ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid) if err != nil { return fmt.Errorf("failed to create ec volume %d: %v", vid, err) } @@ -118,6 +122,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) { if err != nil { return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err) } + if l.IdxDirectory != l.Directory { + indexFileInfos, err := ioutil.ReadDir(l.IdxDirectory) + if err != nil { + return fmt.Errorf("load all ec shards in dir %s: %v", l.IdxDirectory, err) + } + fileInfos = append(fileInfos, indexFileInfos...) + } sort.Slice(fileInfos, func(i, j int) bool { return fileInfos[i].Name() < fileInfos[j].Name() @@ -183,3 +194,10 @@ func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[ne } return deltaVols } + +func (l *DiskLocation) EcVolumesLen() int { + l.ecVolumesLock.RLock() + defer l.ecVolumesLock.RUnlock() + + return len(l.ecVolumes) +} diff --git a/weed/storage/erasure_coding/389.ecx b/weed/storage/erasure_coding/389.ecx Binary files differnew file mode 100644 index 000000000..158781920 --- /dev/null +++ b/weed/storage/erasure_coding/389.ecx diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go index ae77cee3f..47d3c6550 100644 --- a/weed/storage/erasure_coding/ec_decoder.go +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" ) // write .idx file from .ecx and .ecj files @@ -44,20 +45,20 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) { // FindDatFileSize calculate .dat file size from max offset entry // there may be extra deletions after that entry // but they are deletions anyway -func FindDatFileSize(baseFileName string) (datSize int64, err error) { +func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) { - version, err := readEcVolumeVersion(baseFileName) + version, err := readEcVolumeVersion(dataBaseFileName) if err != nil { - return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err) + return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err) } - err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error { + err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error { - if size == types.TombstoneFileSize { + if size.IsDeleted() { return nil } - entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version) + entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version) if datSize < entryStopOffset { datSize = entryStopOffset } @@ -87,7 +88,7 @@ func readEcVolumeVersion(baseFileName string) (version needle.Version, err error } -func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error { +func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error { ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) if openErr != nil { return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) @@ -118,9 +119,12 @@ func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId } func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { + if !util.FileExists(baseFileName + ".ecj") { + return nil + } ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) if openErr != nil { - return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr) } defer ecjFile.Close() diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index eeb384b91..34b639407 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -5,12 +5,13 @@ import ( "io" "os" + "github.com/klauspost/reedsolomon" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/klauspost/reedsolomon" ) const ( @@ -25,9 +26,12 @@ const ( // all keys are sorted in ascending order func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) { - cm, err := readCompactMap(baseFileName) + nm, err := readNeedleMap(baseFileName) + if nm != nil { + defer nm.Close() + } if err != nil { - return fmt.Errorf("readCompactMap: %v", err) + return fmt.Errorf("readNeedleMap: %v", err) } ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) @@ -36,7 +40,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) { } defer ecxFile.Close() - err = cm.AscendingVisit(func(value needle_map.NeedleValue) error { + err = nm.AscendingVisit(func(value needle_map.NeedleValue) error { bytes := value.ToBytes() _, writeErr := ecxFile.Write(bytes) return writeErr @@ -73,6 +77,8 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64, if err != nil { return fmt.Errorf("failed to stat dat file: %v", err) } + + glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size()) err = encodeDatFile(fi.Size(), err, baseFileName, bufferSize, largeBlockSize, file, smallBlockSize) if err != nil { return fmt.Errorf("encodeDatFile: %v", err) @@ -195,7 +201,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi } buffers := make([][]byte, TotalShardsCount) - for i, _ := range buffers { + for i := range buffers { buffers[i] = make([]byte, bufferSize) } @@ -232,7 +238,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o } buffers := make([][]byte, TotalShardsCount) - for i, _ := range buffers { + for i := range buffers { if shardHasData[i] { buffers[i] = make([]byte, ErasureCodingSmallBlockSize) } @@ -280,15 +286,15 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o } -func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) { +func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) { indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644) if err != nil { return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err) } defer indexFile.Close() - cm := needle_map.NewCompactMap() - err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { + cm := needle_map.NewMemDb() + err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error { if !offset.IsZero() && size != types.TombstoneFileSize { cm.Set(key, offset, size) } else { diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go index 562966f8f..19eba6235 100644 --- a/weed/storage/erasure_coding/ec_locate.go +++ b/weed/storage/erasure_coding/ec_locate.go @@ -1,14 +1,18 @@ package erasure_coding +import ( + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + type Interval struct { BlockIndex int InnerBlockOffset int64 - Size uint32 + Size types.Size IsLargeBlock bool LargeBlockRowsCount int } -func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) { +func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) { blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset) // adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size @@ -32,7 +36,7 @@ func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset intervals = append(intervals, interval) return } - interval.Size = uint32(blockRemaining) + interval.Size = types.Size(blockRemaining) intervals = append(intervals, interval) size -= interval.Size diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go index 47e6d3d1e..2a57d85ef 100644 --- a/weed/storage/erasure_coding/ec_shard.go +++ b/weed/storage/erasure_coding/ec_shard.go @@ -2,9 +2,11 @@ package erasure_coding import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "os" "path" "strconv" + "strings" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -19,21 +21,25 @@ type EcVolumeShard struct { dir string ecdFile *os.File ecdFileSize int64 + DiskType types.DiskType } -func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { +func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) { - v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId} + v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType} baseFileName := v.FileName() // open ecd file if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil { - return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e) + if e == os.ErrNotExist || strings.Contains(e.Error(), "no such file or directory") { + return nil, os.ErrNotExist + } + return nil, fmt.Errorf("cannot read ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), e) } ecdFi, statErr := v.ecdFile.Stat() if statErr != nil { - return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr) + return nil, fmt.Errorf("can not stat ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), statErr) } v.ecdFileSize = ecdFi.Size() diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index 0e4aaa27c..0d48bec02 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -7,9 +7,10 @@ import ( "os" "testing" + "github.com/klauspost/reedsolomon" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/klauspost/reedsolomon" ) const ( @@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) { } func validateFiles(baseFileName string) error { - cm, err := readCompactMap(baseFileName) + nm, err := readNeedleMap(baseFileName) + defer nm.Close() if err != nil { - return fmt.Errorf("readCompactMap: %v", err) + return fmt.Errorf("readNeedleMap: %v", err) } datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0) @@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error { ecFiles, err := openEcFiles(baseFileName, true) defer closeEcFiles(ecFiles) - err = cm.AscendingVisit(func(value needle_map.NeedleValue) error { + err = nm.AscendingVisit(func(value needle_map.NeedleValue) error { return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size) }) if err != nil { @@ -69,7 +71,7 @@ func validateFiles(baseFileName string) error { return nil } -func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) error { +func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) error { data, err := readDatFile(datFile, offset, size) if err != nil { @@ -88,10 +90,10 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type return nil } -func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, error) { +func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) { data := make([]byte, size) - n, err := datFile.ReadAt(data, offset.ToAcutalOffset()) + n, err := datFile.ReadAt(data, offset.ToActualOffset()) if err != nil { return nil, fmt.Errorf("failed to ReadAt dat file: %v", err) } @@ -101,9 +103,9 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er return data, nil } -func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) { +func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) { - intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size) + intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size) for i, interval := range intervals { if d, e := readOneInterval(interval, ecFiles); e != nil { @@ -138,7 +140,7 @@ func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err er return } -func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size uint32) (data []byte, err error) { +func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size types.Size) (data []byte, err error) { enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount) if err != nil { return nil, fmt.Errorf("failed to create encoder: %v", err) diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index 579f037fb..171db92a4 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -25,6 +25,7 @@ type EcVolume struct { VolumeId needle.VolumeId Collection string dir string + dirIdx string ecxFile *os.File ecxFileSize int64 ecxCreatedAt time.Time @@ -35,35 +36,37 @@ type EcVolume struct { Version needle.Version ecjFile *os.File ecjFileAccessLock sync.Mutex + diskType types.DiskType } -func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { - ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid} +func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) { + ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType} - baseFileName := EcShardFileName(collection, dir, int(vid)) + dataBaseFileName := EcShardFileName(collection, dir, int(vid)) + indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid)) // open ecx file - if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil { - return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err) + if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err) } ecxFi, statErr := ev.ecxFile.Stat() if statErr != nil { - return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr) + return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr) } ev.ecxFileSize = ecxFi.Size() ev.ecxCreatedAt = ecxFi.ModTime() // open ecj file - if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { - return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) + if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err) } // read volume info ev.Version = needle.Version3 - if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found { + if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found { ev.Version = needle.Version(volumeInfo.Version) } else { - pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) + pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) } ev.ShardLocations = make(map[ShardId][]string) @@ -134,24 +137,42 @@ func (ev *EcVolume) Destroy() { for _, s := range ev.Shards { s.Destroy() } - os.Remove(ev.FileName() + ".ecx") - os.Remove(ev.FileName() + ".ecj") - os.Remove(ev.FileName() + ".vif") + os.Remove(ev.FileName(".ecx")) + os.Remove(ev.FileName(".ecj")) + os.Remove(ev.FileName(".vif")) } -func (ev *EcVolume) FileName() string { +func (ev *EcVolume) FileName(ext string) string { + switch ext { + case ".ecx", ".ecj": + return ev.IndexBaseFileName() + ext + } + // .vif + return ev.DataBaseFileName() + ext +} +func (ev *EcVolume) DataBaseFileName() string { return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId)) +} +func (ev *EcVolume) IndexBaseFileName() string { + return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId)) } -func (ev *EcVolume) ShardSize() int64 { +func (ev *EcVolume) ShardSize() uint64 { if len(ev.Shards) > 0 { - return ev.Shards[0].Size() + return uint64(ev.Shards[0].Size()) } return 0 } +func (ev *EcVolume) Size() (size int64) { + for _, shard := range ev.Shards { + size += shard.Size() + } + return +} + func (ev *EcVolume) CreatedAt() time.Time { return ev.ecxCreatedAt } @@ -171,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V m = &master_pb.VolumeEcShardInformationMessage{ Id: uint32(s.VolumeId), Collection: s.Collection, + DiskType: string(ev.diskType), } messages = append(messages, m) } @@ -180,7 +202,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V return } -func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) { +func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) { // find the needle from ecx file offset, size, err = ev.FindNeedleFromEcx(needleId) @@ -191,16 +213,16 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle. shard := ev.Shards[0] // calculate the locations in the ec shards - intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version))) + intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version))) return } -func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { +func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) { return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) } -func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { +func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) { var key types.NeedleId buf := make([]byte, types.NeedleMapEntrySize) l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go index 822a9e923..a7f8c24a3 100644 --- a/weed/storage/erasure_coding/ec_volume_delete.go +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -12,7 +12,7 @@ import ( var ( MarkNeedleDeleted = func(file *os.File, offset int64) error { b := make([]byte, types.SizeSize) - util.Uint32toBytes(b, types.TombstoneFileSize) + types.SizeToBytes(b, types.TombstoneFileSize) n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize) if err != nil { return fmt.Errorf("sorted needle write error: %v", err) diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index 8ff65bb0f..3dd535e64 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -10,13 +10,15 @@ type EcVolumeInfo struct { VolumeId needle.VolumeId Collection string ShardBits ShardBits + DiskType string } -func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { +func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo { return &EcVolumeInfo{ Collection: collection, VolumeId: vid, ShardBits: shardBits, + DiskType: diskType, } } @@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo { VolumeId: ecInfo.VolumeId, Collection: ecInfo.Collection, ShardBits: ecInfo.ShardBits.Minus(other.ShardBits), + DiskType: ecInfo.DiskType, } return ret @@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb. Id: uint32(ecInfo.VolumeId), EcIndexBits: uint32(ecInfo.ShardBits), Collection: ecInfo.Collection, + DiskType: ecInfo.DiskType, } } diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go new file mode 100644 index 000000000..747ef4aab --- /dev/null +++ b/weed/storage/erasure_coding/ec_volume_test.go @@ -0,0 +1,54 @@ +package erasure_coding + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func TestPositioning(t *testing.T) { + + ecxFile, err := os.OpenFile("389.ecx", os.O_RDONLY, 0) + if err != nil { + t.Errorf("failed to open ecx file: %v", err) + } + defer ecxFile.Close() + + stat, _ := ecxFile.Stat() + fileSize := stat.Size() + + tests := []struct { + needleId string + offset int64 + size int + }{ + {needleId: "0f0edb92", offset: 31300679656, size: 1167}, + {needleId: "0ef7d7f8", offset: 11513014944, size: 66044}, + } + + for _, test := range tests { + needleId, _ := types.ParseNeedleId(test.needleId) + offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) + assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) + } + + needleId, _ := types.ParseNeedleId("0f087622") + offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil) + assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex") + fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size) + + var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3 + intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion))) + + for _, interval := range intervals { + shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) + fmt.Printf("interval: %+v, shardId: %d, shardOffset: %d\n", interval, shardId, shardOffset) + } + +} diff --git a/weed/storage/idx/walk.go b/weed/storage/idx/walk.go index 90efb75e6..5215d3c4f 100644 --- a/weed/storage/idx/walk.go +++ b/weed/storage/idx/walk.go @@ -2,25 +2,26 @@ package idx import ( "io" - "os" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" ) // walks through the index file, calls fn function with each key, offset, size // stops with the error returned by the fn function -func WalkIndexFile(r *os.File, fn func(key types.NeedleId, offset types.Offset, size uint32) error) error { +func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offset, size types.Size) error) error { var readerOffset int64 bytes := make([]byte, types.NeedleMapEntrySize*RowsToRead) count, e := r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + if count == 0 && e == io.EOF { + return nil + } + glog.V(3).Infof("readerOffset %d count %d err: %v", readerOffset, count, e) readerOffset += int64(count) var ( key types.NeedleId offset types.Offset - size uint32 + size types.Size i int ) @@ -35,16 +36,16 @@ func WalkIndexFile(r *os.File, fn func(key types.NeedleId, offset types.Offset, return nil } count, e = r.ReadAt(bytes, readerOffset) - glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e) + glog.V(3).Infof("readerOffset %d count %d err: %v", readerOffset, count, e) readerOffset += int64(count) } return e } -func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size uint32) { +func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size types.Size) { key = types.BytesToNeedleId(bytes[:types.NeedleIdSize]) offset = types.BytesToOffset(bytes[types.NeedleIdSize : types.NeedleIdSize+types.OffsetSize]) - size = util.BytesToUint32(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize]) + size = types.BytesToSize(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize]) return } diff --git a/weed/storage/needle/async_request.go b/weed/storage/needle/async_request.go new file mode 100644 index 000000000..ea02c55c5 --- /dev/null +++ b/weed/storage/needle/async_request.go @@ -0,0 +1,53 @@ +package needle + +type AsyncRequest struct { + N *Needle + IsWriteRequest bool + ActualSize int64 + offset uint64 + size uint64 + doneChan chan interface{} + isUnchanged bool + err error +} + +func NewAsyncRequest(n *Needle, isWriteRequest bool) *AsyncRequest { + return &AsyncRequest{ + offset: 0, + size: 0, + ActualSize: 0, + doneChan: make(chan interface{}), + N: n, + isUnchanged: false, + IsWriteRequest: isWriteRequest, + err: nil, + } +} + +func (r *AsyncRequest) WaitComplete() (uint64, uint64, bool, error) { + <-r.doneChan + return r.offset, r.size, r.isUnchanged, r.err +} + +func (r *AsyncRequest) Complete(offset uint64, size uint64, isUnchanged bool, err error) { + r.offset = offset + r.size = size + r.isUnchanged = isUnchanged + r.err = err + close(r.doneChan) +} + +func (r *AsyncRequest) UpdateResult(offset uint64, size uint64, isUnchanged bool, err error) { + r.offset = offset + r.size = size + r.isUnchanged = isUnchanged + r.err = err +} + +func (r *AsyncRequest) Submit() { + close(r.doneChan) +} + +func (r *AsyncRequest) IsSucceed() bool { + return r.err == nil +} diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go index 00ea1db69..4476631c2 100644 --- a/weed/storage/needle/crc.go +++ b/weed/storage/needle/crc.go @@ -1,11 +1,12 @@ package needle import ( - "crypto/md5" "fmt" + "io" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/klauspost/crc32" + + "github.com/chrislusf/seaweedfs/weed/util" ) var table = crc32.MakeTable(crc32.Castagnoli) @@ -30,12 +31,24 @@ func (n *Needle) Etag() string { return fmt.Sprintf("%x", bits) } -func (n *Needle) MD5() string { +func NewCRCwriter(w io.Writer) *CRCwriter { - hash := md5.New() + return &CRCwriter{ + crc: CRC(0), + w: w, + } - hash.Write(n.Data) +} - return fmt.Sprintf("%x", hash.Sum(nil)) +type CRCwriter struct { + crc CRC + w io.Writer +} +func (c *CRCwriter) Write(p []byte) (n int, err error) { + n, err = c.w.Write(p) // with each write ... + c.crc = c.crc.Update(p) + return } + +func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash diff --git a/weed/storage/needle/file_id.go b/weed/storage/needle/file_id.go index 5dabb0f25..6055bdd1c 100644 --- a/weed/storage/needle/file_id.go +++ b/weed/storage/needle/file_id.go @@ -66,7 +66,7 @@ func formatNeedleIdCookie(key NeedleId, cookie Cookie) string { NeedleIdToBytes(bytes[0:NeedleIdSize], key) CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], cookie) nonzero_index := 0 - for ; bytes[nonzero_index] == 0; nonzero_index++ { + for ; bytes[nonzero_index] == 0 && nonzero_index < NeedleIdSize; nonzero_index++ { } return hex.EncodeToString(bytes[nonzero_index:]) } diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 2f03ba87b..34d29ab6e 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -8,8 +8,6 @@ import ( "strings" "time" - "io/ioutil" - "github.com/chrislusf/seaweedfs/weed/images" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -26,7 +24,7 @@ const ( type Needle struct { Cookie Cookie `comment:"random number to mitigate brute force lookups"` Id NeedleId `comment:"needle id"` - Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` + Size Size `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"` DataSize uint32 `comment:"Data size"` //version2 Data []byte `comment:"The actual file data"` @@ -46,57 +44,33 @@ type Needle struct { } func (n *Needle) String() (str string) { - str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime) + str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s Compressed:%v", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime, n.IsCompressed()) return } -func ParseUpload(r *http.Request) ( - fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int, - modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { - pairMap = make(map[string]string) - for k, v := range r.Header { - if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { - pairMap[k] = v[0] - } - } - - if r.Method == "POST" { - fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r) - } else { - isGzipped = false - mimeType = r.Header.Get("Content-Type") - fileName = "" - data, e = ioutil.ReadAll(r.Body) - originalDataSize = len(data) - } - if e != nil { - return - } - - modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) - ttl, _ = ReadTTL(r.FormValue("ttl")) - - return -} -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) { - var pairMap map[string]string - fname, mimeType, isGzipped, isChunkedFile := "", "", false, false +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) { n = new(Needle) - fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) + pu, e := ParseUpload(r, sizeLimit) if e != nil { return } - if len(fname) < 256 { - n.Name = []byte(fname) + n.Data = pu.Data + originalSize = pu.OriginalDataSize + n.LastModified = pu.ModifiedTime + n.Ttl = pu.Ttl + contentMd5 = pu.ContentMd5 + + if len(pu.FileName) < 256 { + n.Name = []byte(pu.FileName) n.SetHasName() } - if len(mimeType) < 256 { - n.Mime = []byte(mimeType) + if len(pu.MimeType) < 256 { + n.Mime = []byte(pu.MimeType) n.SetHasMime() } - if len(pairMap) != 0 { + if len(pu.PairMap) != 0 { trimmedPairMap := make(map[string]string) - for k, v := range pairMap { + for k, v := range pu.PairMap { trimmedPairMap[k[len(PairNamePrefix):]] = v } @@ -107,8 +81,9 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle n.SetHasPairs() } } - if isGzipped { - n.SetGzipped() + if pu.IsGzipped { + // println(r.URL.Path, "is set to compressed", pu.FileName, pu.IsGzipped, "dataSize", pu.OriginalDataSize) + n.SetIsCompressed() } if n.LastModified == 0 { n.LastModified = uint64(time.Now().Unix()) @@ -118,13 +93,13 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle n.SetHasTtl() } - if isChunkedFile { + if pu.IsChunkedFile { n.SetIsChunkManifest() } if fixJpgOrientation { - loweredName := strings.ToLower(fname) - if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { + loweredName := strings.ToLower(pu.FileName) + if pu.MimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { n.Data = images.FixJpgOrientation(n.Data) } } diff --git a/weed/storage/needle/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go deleted file mode 100644 index 8be1a1da4..000000000 --- a/weed/storage/needle/needle_parse_multipart.go +++ /dev/null @@ -1,109 +0,0 @@ -package needle - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - - "io" - "io/ioutil" - "mime" - "net/http" - "path" - "strconv" - "strings" -) - -func parseMultipart(r *http.Request) ( - fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) { - defer func() { - if e != nil && r.Body != nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - } - }() - form, fe := r.MultipartReader() - if fe != nil { - glog.V(0).Infoln("MultipartReader [ERROR]", fe) - e = fe - return - } - - //first multi-part item - part, fe := form.NextPart() - if fe != nil { - glog.V(0).Infoln("Reading Multi part [ERROR]", fe) - e = fe - return - } - - fileName = part.FileName() - if fileName != "" { - fileName = path.Base(fileName) - } - - data, e = ioutil.ReadAll(part) - if e != nil { - glog.V(0).Infoln("Reading Content [ERROR]", e) - return - } - - //if the filename is empty string, do a search on the other multi-part items - for fileName == "" { - part2, fe := form.NextPart() - if fe != nil { - break // no more or on error, just safely break - } - - fName := part2.FileName() - - //found the first <file type> multi-part has filename - if fName != "" { - data2, fe2 := ioutil.ReadAll(part2) - if fe2 != nil { - glog.V(0).Infoln("Reading Content [ERROR]", fe2) - e = fe2 - return - } - - //update - data = data2 - fileName = path.Base(fName) - break - } - } - - originalDataSize = len(data) - - isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) - - if !isChunkedFile { - - dotIndex := strings.LastIndex(fileName, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = strings.ToLower(fileName[dotIndex:]) - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && mtype != contentType { - mimeType = contentType //only return mime type if not deductable - mtype = contentType - } - - if part.Header.Get("Content-Encoding") == "gzip" { - if unzipped, e := util.UnGzipData(data); e == nil { - originalDataSize = len(unzipped) - } - isGzipped = true - } else if util.IsGzippable(ext, mtype, data) { - if compressedData, err := util.GzipData(data); err == nil { - if len(data) > len(compressedData) { - data = compressedData - isGzipped = true - } - } - } - } - - return -} diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go new file mode 100644 index 000000000..7201503f1 --- /dev/null +++ b/weed/storage/needle/needle_parse_upload.go @@ -0,0 +1,201 @@ +package needle + +import ( + "crypto/md5" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "mime" + "net/http" + "path" + "path/filepath" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type ParsedUpload struct { + FileName string + Data []byte + MimeType string + PairMap map[string]string + IsGzipped bool + // IsZstd bool + OriginalDataSize int + ModifiedTime uint64 + Ttl *TTL + IsChunkedFile bool + UncompressedData []byte + ContentMd5 string +} + +func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { + pu = &ParsedUpload{} + pu.PairMap = make(map[string]string) + for k, v := range r.Header { + if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { + pu.PairMap[k] = v[0] + } + } + + if r.Method == "POST" { + e = parseMultipart(r, sizeLimit, pu) + } else { + e = parsePut(r, sizeLimit, pu) + } + if e != nil { + return + } + + pu.ModifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + pu.Ttl, _ = ReadTTL(r.FormValue("ttl")) + + pu.OriginalDataSize = len(pu.Data) + pu.UncompressedData = pu.Data + // println("received data", len(pu.Data), "isGzipped", pu.IsGzipped, "mime", pu.MimeType, "name", pu.FileName) + if pu.IsGzipped { + if unzipped, e := util.DecompressData(pu.Data); e == nil { + pu.OriginalDataSize = len(unzipped) + pu.UncompressedData = unzipped + // println("ungzipped data size", len(unzipped)) + } + } else { + ext := filepath.Base(pu.FileName) + mimeType := pu.MimeType + if mimeType == "" { + mimeType = http.DetectContentType(pu.Data) + } + // println("detected mimetype to", pu.MimeType) + if mimeType == "application/octet-stream" { + mimeType = "" + } + if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure { + // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType) + if compressedData, err := util.GzipData(pu.Data); err == nil { + if len(compressedData)*10 < len(pu.Data)*9 { + pu.Data = compressedData + pu.IsGzipped = true + } + // println("gzipped data size", len(compressedData)) + } + } + } + + // md5 + h := md5.New() + h.Write(pu.UncompressedData) + pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil)) + if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" { + if expectedChecksum != pu.ContentMd5 { + e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData)) + return + } + } + + return +} + +func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { + pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd" + pu.MimeType = r.Header.Get("Content-Type") + pu.FileName = "" + pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1)) + if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 { + io.Copy(ioutil.Discard, r.Body) + } + r.Body.Close() + return nil +} + +func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { + defer func() { + if e != nil && r.Body != nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + } + }() + form, fe := r.MultipartReader() + if fe != nil { + glog.V(0).Infoln("MultipartReader [ERROR]", fe) + e = fe + return + } + + // first multi-part item + part, fe := form.NextPart() + if fe != nil { + glog.V(0).Infoln("Reading Multi part [ERROR]", fe) + e = fe + return + } + + pu.FileName = part.FileName() + if pu.FileName != "" { + pu.FileName = path.Base(pu.FileName) + } + + pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1)) + if e != nil { + glog.V(0).Infoln("Reading Content [ERROR]", e) + return + } + if len(pu.Data) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + + // if the filename is empty string, do a search on the other multi-part items + for pu.FileName == "" { + part2, fe := form.NextPart() + if fe != nil { + break // no more or on error, just safely break + } + + fName := part2.FileName() + + // found the first <file type> multi-part has filename + if fName != "" { + data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1)) + if fe2 != nil { + glog.V(0).Infoln("Reading Content [ERROR]", fe2) + e = fe2 + return + } + if len(data2) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + + // update + pu.Data = data2 + pu.FileName = path.Base(fName) + break + } + } + + pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) + + if !pu.IsChunkedFile { + + dotIndex := strings.LastIndex(pu.FileName, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = strings.ToLower(pu.FileName[dotIndex:]) + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && contentType != "application/octet-stream" && mtype != contentType { + pu.MimeType = contentType // only return mime type if not deductable + mtype = contentType + } + + } + pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" + // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd" + + return +} diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 7f8aa4823..16c2fd06b 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -3,17 +3,16 @@ package needle import ( "errors" "fmt" - "io" - "math" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" + "io" + "math" ) const ( - FlagGzip = 0x01 + FlagIsCompressed = 0x01 FlagHasName = 0x02 FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 @@ -24,11 +23,13 @@ const ( TtlBytesLength = 2 ) +var ErrorSizeMismatch = errors.New("size mismatch") + func (n *Needle) DiskSize(version Version) int64 { return GetActualSize(n.Size, version) } -func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) { +func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) { writeBytes := make([]byte, 0) @@ -37,8 +38,8 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err header := make([]byte, NeedleHeaderSize) CookieToBytes(header[0:CookieSize], n.Cookie) NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) - n.Size = uint32(len(n.Data)) - util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) + n.Size = Size(len(n.Data)) + SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) size := n.Size actualSize := NeedleHeaderSize + int64(n.Size) writeBytes = append(writeBytes, header...) @@ -58,12 +59,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err } n.DataSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Mime)) if n.DataSize > 0 { - n.Size = 4 + n.DataSize + 1 + n.Size = 4 + Size(n.DataSize) + 1 if n.HasName() { - n.Size = n.Size + 1 + uint32(n.NameSize) + n.Size = n.Size + 1 + Size(n.NameSize) } if n.HasMime() { - n.Size = n.Size + 1 + uint32(n.MimeSize) + n.Size = n.Size + 1 + Size(n.MimeSize) } if n.HasLastModifiedDate() { n.Size = n.Size + LastModifiedBytesLength @@ -72,12 +73,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err n.Size = n.Size + TtlBytesLength } if n.HasPairs() { - n.Size += 2 + uint32(n.PairsSize) + n.Size += 2 + Size(n.PairsSize) } } else { n.Size = 0 } - util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) + SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...) if n.DataSize > 0 { util.Uint32toBytes(header[0:4], n.DataSize) @@ -119,13 +120,13 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...) } - return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil + return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil } return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) { +func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) { if end, _, e := w.GetStat(); e == nil { defer func(w backend.BackendStorageFile, off int64) { @@ -140,6 +141,10 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) return } + if offset >= MaxPossibleVolumeSize { + err = fmt.Errorf("Volume Size %d Exeededs %d", offset, MaxPossibleVolumeSize) + return + } bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version) @@ -150,21 +155,63 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return offset, size, actualSize, err } -func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) { +func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) { + + if end, _, e := w.GetStat(); e == nil { + defer func(w backend.BackendStorageFile, off int64) { + if err != nil { + if te := w.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) + } + } + }(w, end) + offset = uint64(end) + } else { + err = fmt.Errorf("Cannot Read Current Volume Position: %v", e) + return + } + + if version == Version3 { + tsOffset := NeedleHeaderSize + size + NeedleChecksumSize + util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs) + } + + if err == nil { + _, err = w.WriteAt(dataSlice, int64(offset)) + } + + return + +} + +func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) - _, err = r.ReadAt(dataSlice, offset) + var n int + n, err = r.ReadAt(dataSlice, offset) + if err != nil && int64(n) == dataSize { + err = nil + } + if err != nil { + fileSize, _, _ := r.GetStat() + println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize) + } return dataSlice, err } // ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set. -func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) { +func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) { n.ParseNeedleHeader(bytes) if n.Size != size { - return fmt.Errorf("entry not found: offset %d found id %d size %d, expected size %d", offset, n.Id, n.Size, size) + // cookie is not always passed in for this API. Use size to do preliminary checking. + if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) { + glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) + return ErrorSizeMismatch + } + return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size) } switch version { case Version1: @@ -191,7 +238,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers } // ReadData hydrates the needle from the file, with only n.Id is set. -func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) { +func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) { bytes, err := ReadNeedleBlob(r, offset, size, version) if err != nil { return err @@ -202,7 +249,7 @@ func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint3 func (n *Needle) ParseNeedleHeader(bytes []byte) { n.Cookie = BytesToCookie(bytes[0:CookieSize]) n.Id = BytesToNeedleId(bytes[CookieSize : CookieSize+NeedleIdSize]) - n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize]) + n.Size = BytesToSize(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize]) } func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { @@ -284,7 +331,7 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6 return } -func PaddingLength(needleSize uint32, version Version) uint32 { +func PaddingLength(needleSize Size, version Version) Size { if version == Version3 { // this is same value as version2, but just listed here for clarity return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize + TimestampSize) % NeedlePaddingSize) @@ -292,7 +339,7 @@ func PaddingLength(needleSize uint32, version Version) uint32 { return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize) % NeedlePaddingSize) } -func NeedleBodyLength(needleSize uint32, version Version) int64 { +func NeedleBodyLength(needleSize Size, version Version) int64 { if version == Version3 { return int64(needleSize) + NeedleChecksumSize + TimestampSize + int64(PaddingLength(needleSize, version)) } @@ -339,11 +386,11 @@ func (n *Needle) ReadNeedleBodyBytes(needleBody []byte, version Version) (err er return } -func (n *Needle) IsGzipped() bool { - return n.Flags&FlagGzip > 0 +func (n *Needle) IsCompressed() bool { + return n.Flags&FlagIsCompressed > 0 } -func (n *Needle) SetGzipped() { - n.Flags = n.Flags | FlagGzip +func (n *Needle) SetIsCompressed() { + n.Flags = n.Flags | FlagIsCompressed } func (n *Needle) HasName() bool { return n.Flags&FlagHasName > 0 @@ -386,6 +433,6 @@ func (n *Needle) SetHasPairs() { n.Flags = n.Flags | FlagHasPairs } -func GetActualSize(size uint32, version Version) int64 { +func GetActualSize(size Size, version Version) int64 { return NeedleHeaderSize + NeedleBodyLength(size, version) } diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go index 47582dd26..afcea5a05 100644 --- a/weed/storage/needle/needle_read_write_test.go +++ b/weed/storage/needle/needle_read_write_test.go @@ -48,7 +48,7 @@ func TestAppend(t *testing.T) { int64 : -9223372036854775808 to 9223372036854775807 */ - fileSize := int64(4294967295) + 10000 + fileSize := int64(4294967296) + 10000 tempFile.Truncate(fileSize) defer func() { tempFile.Close() diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go index 4a169870d..d0de3768e 100644 --- a/weed/storage/needle/volume_ttl.go +++ b/weed/storage/needle/volume_ttl.go @@ -1,11 +1,12 @@ package needle import ( + "fmt" "strconv" ) const ( - //stored unit types + // stored unit types Empty byte = iota Minute Hour @@ -69,6 +70,9 @@ func (t *TTL) ToBytes(output []byte) { } func (t *TTL) ToUint32() (output uint32) { + if t == nil || t.Count == 0 { + return 0 + } output = uint32(t.Count) << 8 output += uint32(t.Unit) return output @@ -130,9 +134,49 @@ func (t TTL) Minutes() uint32 { case Week: return uint32(t.Count) * 60 * 24 * 7 case Month: - return uint32(t.Count) * 60 * 24 * 31 + return uint32(t.Count) * 60 * 24 * 30 case Year: return uint32(t.Count) * 60 * 24 * 365 } return 0 } + +func SecondsToTTL(seconds int32) string { + if seconds == 0 { + return "" + } + if seconds%(3600*24*365) == 0 && seconds/(3600*24*365) < 256 { + return fmt.Sprintf("%dy", seconds/(3600*24*365)) + } + if seconds%(3600*24*30) == 0 && seconds/(3600*24*30) < 256 { + return fmt.Sprintf("%dM", seconds/(3600*24*30)) + } + if seconds%(3600*24*7) == 0 && seconds/(3600*24*7) < 256 { + return fmt.Sprintf("%dw", seconds/(3600*24*7)) + } + if seconds%(3600*24) == 0 && seconds/(3600*24) < 256 { + return fmt.Sprintf("%dd", seconds/(3600*24)) + } + if seconds%(3600) == 0 && seconds/(3600) < 256 { + return fmt.Sprintf("%dh", seconds/(3600)) + } + if seconds/60 < 256 { + return fmt.Sprintf("%dm", seconds/60) + } + if seconds/(3600) < 256 { + return fmt.Sprintf("%dh", seconds/(3600)) + } + if seconds/(3600*24) < 256 { + return fmt.Sprintf("%dd", seconds/(3600*24)) + } + if seconds/(3600*24*7) < 256 { + return fmt.Sprintf("%dw", seconds/(3600*24*7)) + } + if seconds/(3600*24*30) < 256 { + return fmt.Sprintf("%dM", seconds/(3600*24*30)) + } + if seconds/(3600*24*365) < 256 { + return fmt.Sprintf("%dy", seconds/(3600*24*365)) + } + return "" +} diff --git a/weed/storage/needle/volume_ttl_test.go b/weed/storage/needle/volume_ttl_test.go index 0afebebf5..150d06e6e 100644 --- a/weed/storage/needle/volume_ttl_test.go +++ b/weed/storage/needle/volume_ttl_test.go @@ -30,13 +30,18 @@ func TestTTLReadWrite(t *testing.T) { t.Errorf("5d ttl:%v", ttl) } + ttl, _ = ReadTTL("50d") + if ttl.Minutes() != 50*24*60 { + t.Errorf("50d ttl:%v", ttl) + } + ttl, _ = ReadTTL("5w") if ttl.Minutes() != 5*7*24*60 { t.Errorf("5w ttl:%v", ttl) } ttl, _ = ReadTTL("5M") - if ttl.Minutes() != 5*31*24*60 { + if ttl.Minutes() != 5*30*24*60 { t.Errorf("5M ttl:%v", ttl) } diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 77d081ea7..d35391f66 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -1,25 +1,26 @@ package storage import ( - "fmt" + "io" "os" "sync" + "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) -type NeedleMapType int +type NeedleMapKind int const ( - NeedleMapInMemory NeedleMapType = iota + NeedleMapInMemory NeedleMapKind = iota NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer ) type NeedleMapper interface { - Put(key NeedleId, offset Offset, size uint32) error + Put(key NeedleId, offset Offset, size Size) error Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) Delete(key NeedleId, offset Offset) error Close() @@ -30,6 +31,8 @@ type NeedleMapper interface { DeletedCount() int MaxFileKey() NeedleId IndexFileSize() uint64 + Sync() error + ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) } type baseNeedleMapper struct { @@ -37,6 +40,7 @@ type baseNeedleMapper struct { indexFile *os.File indexFileAccessLock sync.Mutex + indexFileOffset int64 } func (nm *baseNeedleMapper) IndexFileSize() uint64 { @@ -47,15 +51,35 @@ func (nm *baseNeedleMapper) IndexFileSize() uint64 { return 0 } -func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error { +func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error { bytes := needle_map.ToBytes(key, offset, size) nm.indexFileAccessLock.Lock() defer nm.indexFileAccessLock.Unlock() - if _, err := nm.indexFile.Seek(0, 2); err != nil { - return fmt.Errorf("cannot seek end of indexfile %s: %v", - nm.indexFile.Name(), err) + written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset) + if err == nil { + nm.indexFileOffset += int64(written) } - _, err := nm.indexFile.Write(bytes) return err } + +func (nm *baseNeedleMapper) Sync() error { + return nm.indexFile.Sync() +} + +func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) { + bytes := make([]byte, NeedleMapEntrySize) + var readCount int + if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil { + if err == io.EOF { + if readCount == NeedleMapEntrySize { + err = nil + } + } + if err != nil { + return + } + } + key, offset, size = idx.IdxFileEntry(bytes) + return +} diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go index 76783d0b0..2b1a471bc 100644 --- a/weed/storage/needle_map/compact_map.go +++ b/weed/storage/needle_map/compact_map.go @@ -18,7 +18,7 @@ const SectionalNeedleIdLimit = 1<<32 - 1 type SectionalNeedleValue struct { Key SectionalNeedleId OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 `comment:"Size of the data portion"` + Size Size `comment:"Size of the data portion"` } type SectionalNeedleValueExtra struct { @@ -50,7 +50,7 @@ func NewCompactSection(start NeedleId) *CompactSection { } //return old entry size -func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { +func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { cs.Lock() if key > cs.end { cs.end = key @@ -80,7 +80,7 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs return } -func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) { +func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) { needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size} needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: offset.OffsetHigher} insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { @@ -115,24 +115,21 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { return cs.overflow[i].Key >= key }) if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { - for i := deleteCandidate; i < length-1; i++ { - cs.overflow[i] = cs.overflow[i+1] - cs.overflowExtra[i] = cs.overflowExtra[i+1] + if cs.overflow[deleteCandidate].Size.IsValid() { + cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size } - cs.overflow = cs.overflow[0 : length-1] - cs.overflowExtra = cs.overflowExtra[0 : length-1] } } //return old entry size -func (cs *CompactSection) Delete(key NeedleId) uint32 { +func (cs *CompactSection) Delete(key NeedleId) Size { skey := SectionalNeedleId(key - cs.start) cs.Lock() - ret := uint32(0) + ret := Size(0) if i := cs.binarySearchValues(skey); i >= 0 { - if cs.values[i].Size > 0 && cs.values[i].Size != TombstoneFileSize { + if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() { ret = cs.values[i].Size - cs.values[i].Size = TombstoneFileSize + cs.values[i].Size = -cs.values[i].Size } } if _, v, found := cs.findOverflowEntry(skey); found { @@ -181,7 +178,7 @@ func NewCompactMap() *CompactMap { return &CompactMap{} } -func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { +func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) { x := cm.binarySearchCompactSection(key) if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit { // println(x, "adding to existing", len(cm.list), "sections, starting", key) @@ -204,10 +201,10 @@ func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset O // println(key, "set to section[", x, "].start", cm.list[x].start) return cm.list[x].Set(key, offset, size) } -func (cm *CompactMap) Delete(key NeedleId) uint32 { +func (cm *CompactMap) Delete(key NeedleId) Size { x := cm.binarySearchCompactSection(key) if x < 0 { - return uint32(0) + return Size(0) } return cm.list[x].Delete(key) } diff --git a/weed/storage/needle_map/compact_map_perf_test.go b/weed/storage/needle_map/compact_map_perf_test.go index 3a3648641..081fb34e9 100644 --- a/weed/storage/needle_map/compact_map_perf_test.go +++ b/weed/storage/needle_map/compact_map_perf_test.go @@ -9,7 +9,6 @@ import ( "time" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" ) /* @@ -32,7 +31,7 @@ func TestMemoryUsage(t *testing.T) { startTime := time.Now() for i := 0; i < 10; i++ { - indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) + indexFile, ie := os.OpenFile("../../../test/data/sample.idx", os.O_RDWR|os.O_RDONLY, 0644) if ie != nil { log.Fatalln(ie) } @@ -60,7 +59,7 @@ func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) { rowCount++ key := BytesToNeedleId(bytes[i : i+NeedleIdSize]) offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize]) - size := util.BytesToUint32(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize]) + size := BytesToSize(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize]) if !offset.IsZero() { m.Set(NeedleId(key), offset, size) diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 7eea3969a..199cb26b3 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -49,7 +49,7 @@ func TestIssue52(t *testing.T) { func TestCompactMap(t *testing.T) { m := NewCompactMap() for i := uint32(0); i < 100*batch; i += 2 { - m.Set(NeedleId(i), ToOffset(int64(i)), i) + m.Set(NeedleId(i), ToOffset(int64(i)), Size(i)) } for i := uint32(0); i < 100*batch; i += 37 { @@ -57,7 +57,7 @@ func TestCompactMap(t *testing.T) { } for i := uint32(0); i < 10*batch; i += 3 { - m.Set(NeedleId(i), ToOffset(int64(i+11)), i+5) + m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5)) } // for i := uint32(0); i < 100; i++ { @@ -72,15 +72,15 @@ func TestCompactMap(t *testing.T) { if !ok { t.Fatal("key", i, "missing!") } - if v.Size != i+5 { + if v.Size != Size(i+5) { t.Fatal("key", i, "size", v.Size) } } else if i%37 == 0 { - if ok && v.Size != TombstoneFileSize { + if ok && v.Size.IsValid() { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { - if v.Size != i { + if v.Size != Size(i) { t.Fatal("key", i, "size", v.Size) } } @@ -89,14 +89,14 @@ func TestCompactMap(t *testing.T) { for i := uint32(10 * batch); i < 100*batch; i++ { v, ok := m.Get(NeedleId(i)) if i%37 == 0 { - if ok && v.Size != TombstoneFileSize { + if ok && v.Size.IsValid() { t.Fatal("key", i, "should have been deleted needle value", v) } } else if i%2 == 0 { if v == nil { t.Fatal("key", i, "missing") } - if v.Size != i { + if v.Size != Size(i) { t.Fatal("key", i, "size", v.Size) } } @@ -129,8 +129,8 @@ func TestOverflow(t *testing.T) { cs.deleteOverflowEntry(4) - if len(cs.overflow) != 4 { - t.Fatalf("expecting 4 entries now: %+v", cs.overflow) + if len(cs.overflow) != 5 { + t.Fatalf("expecting 5 entries now: %+v", cs.overflow) } _, x, _ := cs.findOverflowEntry(5) @@ -146,7 +146,7 @@ func TestOverflow(t *testing.T) { cs.deleteOverflowEntry(1) for i, x := range cs.overflow { - println("overflow[", i, "]:", x.Key) + println("overflow[", i, "]:", x.Key, "size", x.Size) } println() diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go index 6aba6adeb..ba1fd3d1e 100644 --- a/weed/storage/needle_map/memdb.go +++ b/weed/storage/needle_map/memdb.go @@ -2,6 +2,7 @@ package needle_map import ( "fmt" + "io" "os" "github.com/syndtr/goleveldb/leveldb" @@ -11,7 +12,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/idx" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" ) //This map uses in memory level db @@ -32,7 +32,7 @@ func NewMemDb() *MemDb { return t } -func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error { +func (cm *MemDb) Set(key NeedleId, offset Offset, size Size) error { bytes := ToBytes(key, offset, size) @@ -56,7 +56,7 @@ func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) { return nil, false } offset := BytesToOffset(data[0:OffsetSize]) - size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize]) return &NeedleValue{Key: key, Offset: offset, Size: size}, true } @@ -67,7 +67,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) { key := BytesToNeedleId(iter.Key()) data := iter.Value() offset := BytesToOffset(data[0:OffsetSize]) - size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize]) needle := NeedleValue{Key: key, Offset: offset, Size: size} ret = visit(needle) @@ -89,6 +89,9 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) { defer idxFile.Close() return cm.AscendingVisit(func(value NeedleValue) error { + if value.Offset.IsZero() || value.Size.IsDeleted() { + return nil + } _, err := idxFile.Write(value.ToBytes()) return err }) @@ -102,11 +105,21 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { } defer idxFile.Close() - return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error { - if offset.IsZero() || size == TombstoneFileSize { - return nil + return cm.LoadFromReaderAt(idxFile) + +} + +func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) { + + return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error { + if offset.IsZero() || size.IsDeleted() { + return cm.Delete(key) } return cm.Set(key, offset, size) }) } + +func (cm *MemDb) Close() { + cm.db.Close() +} diff --git a/weed/storage/needle_map/memdb_test.go b/weed/storage/needle_map/memdb_test.go new file mode 100644 index 000000000..7b45d23f8 --- /dev/null +++ b/weed/storage/needle_map/memdb_test.go @@ -0,0 +1,23 @@ +package needle_map + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func BenchmarkMemDb(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + nm := NewMemDb() + + nid := types.NeedleId(345) + offset := types.Offset{ + OffsetHigher: types.OffsetHigher{}, + OffsetLower: types.OffsetLower{}, + } + nm.Set(nid, offset, 324) + nm.Close() + } + +} diff --git a/weed/storage/needle_map/needle_value.go b/weed/storage/needle_map/needle_value.go index ef540b55e..f8d614660 100644 --- a/weed/storage/needle_map/needle_value.go +++ b/weed/storage/needle_map/needle_value.go @@ -9,7 +9,7 @@ import ( type NeedleValue struct { Key NeedleId Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 `comment:"Size of the data portion"` + Size Size `comment:"Size of the data portion"` } func (this NeedleValue) Less(than btree.Item) bool { @@ -21,10 +21,10 @@ func (nv NeedleValue) ToBytes() []byte { return ToBytes(nv.Key, nv.Offset, nv.Size) } -func ToBytes(key NeedleId, offset Offset, size uint32) []byte { +func ToBytes(key NeedleId, offset Offset, size Size) []byte { bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) NeedleIdToBytes(bytes[0:NeedleIdSize], key) OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset) - util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size) + util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], uint32(size)) return bytes } diff --git a/weed/storage/needle_map/needle_value_map.go b/weed/storage/needle_map/needle_value_map.go index 0a5a00ef7..a30cb96c4 100644 --- a/weed/storage/needle_map/needle_value_map.go +++ b/weed/storage/needle_map/needle_value_map.go @@ -5,8 +5,8 @@ import ( ) type NeedleValueMap interface { - Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) - Delete(key NeedleId) uint32 + Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) + Delete(key NeedleId) Size Get(key NeedleId) (*NeedleValue, bool) AscendingVisit(visit func(NeedleValue) error) error } diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index ef8571e83..31c86d124 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -5,14 +5,16 @@ import ( "os" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/syndtr/goleveldb/leveldb" ) type LevelDbNeedleMap struct { @@ -25,14 +27,24 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option m = &LevelDbNeedleMap{dbFileName: dbFileName} m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { - glog.V(0).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) + glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) generateLevelDbFile(dbFileName, indexFile) - glog.V(0).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) + } + if stat, err := indexFile.Stat(); err != nil { + glog.Fatalf("stat file %s: %v", indexFile.Name(), err) + } else { + m.indexFileOffset = stat.Size() } glog.V(1).Infof("Opening %s...", dbFileName) if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { - return + if errors.IsCorrupted(err) { + m.db, err = leveldb.RecoverFile(dbFileName, opts) + } + if err != nil { + return + } } glog.V(1).Infof("Loading %s...", indexFile.Name()) mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) @@ -66,8 +78,8 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { return err } defer db.Close() - return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { - if !offset.IsZero() && size != TombstoneFileSize { + return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error { + if !offset.IsZero() && size.IsValid() { levelDbWrite(db, key, offset, size) } else { levelDbDelete(db, key) @@ -84,12 +96,12 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o return nil, false } offset := BytesToOffset(data[0:OffsetSize]) - size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize]) return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true } -func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { - var oldSize uint32 +func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error { + var oldSize Size if oldNeedle, ok := m.Get(key); ok { oldSize = oldNeedle.Size } @@ -101,7 +113,7 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { return levelDbWrite(m.db, key, offset, size) } -func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size uint32) error { +func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size) error { bytes := needle_map.ToBytes(key, offset, size) @@ -117,19 +129,34 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error { } func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { - if oldNeedle, ok := m.Get(key); ok { - m.logDelete(oldNeedle.Size) + oldNeedle, found := m.Get(key) + if !found || oldNeedle.Size.IsDeleted() { + return nil } + m.logDelete(oldNeedle.Size) + // write to index file first if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { return err } - return levelDbDelete(m.db, key) + + return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size) } func (m *LevelDbNeedleMap) Close() { - m.indexFile.Close() - m.db.Close() + indexFileName := m.indexFile.Name() + if err := m.indexFile.Sync(); err != nil { + glog.Warningf("sync file %s failed: %v", indexFileName, err) + } + if err := m.indexFile.Close(); err != nil { + glog.Warningf("close index file %s failed: %v", indexFileName, err) + } + + if m.db != nil { + if err := m.db.Close(); err != nil { + glog.Warningf("close levelDB failed: %v", err) + } + } } func (m *LevelDbNeedleMap) Destroy() error { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index 37dee7889..1b58708c6 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { m: needle_map.NewCompactMap(), } nm.indexFile = file + stat, err := file.Stat() + if err != nil { + glog.Fatalf("stat file %s: %v", file.Name(), err) + } + nm.indexFileOffset = stat.Size() return nm } @@ -28,13 +33,13 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { } func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { - e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { + e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error { nm.MaybeSetMaxFileKey(key) - if !offset.IsZero() && size != TombstoneFileSize { + if !offset.IsZero() && size.IsValid() { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) - if !oldOffset.IsZero() && oldSize != TombstoneFileSize { + if !oldOffset.IsZero() && oldSize.IsValid() { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } @@ -49,7 +54,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { return nm, e } -func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error { +func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error { _, oldSize := nm.m.Set(NeedleId(key), offset, size) nm.logPut(key, oldSize, size) return nm.appendToIndexFile(key, offset, size) @@ -64,6 +69,10 @@ func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error { return nm.appendToIndexFile(key, offset, TombstoneFileSize) } func (nm *NeedleMap) Close() { + indexFileName := nm.indexFile.Name() + if err := nm.indexFile.Sync(); err != nil { + glog.Warningf("sync file %s failed, %v", indexFileName, err) + } _ = nm.indexFile.Close() } func (nm *NeedleMap) Destroy() error { diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go index 823a04108..3618dada9 100644 --- a/weed/storage/needle_map_metric.go +++ b/weed/storage/needle_map_metric.go @@ -18,31 +18,31 @@ type mapMetric struct { MaximumFileKey uint64 `json:"MaxFileKey"` } -func (mm *mapMetric) logDelete(deletedByteCount uint32) { +func (mm *mapMetric) logDelete(deletedByteCount Size) { if mm == nil { return } mm.LogDeletionCounter(deletedByteCount) } -func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) { +func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) { if mm == nil { return } mm.MaybeSetMaxFileKey(key) mm.LogFileCounter(newSize) - if oldSize > 0 && oldSize != TombstoneFileSize { + if oldSize > 0 && oldSize.IsValid() { mm.LogDeletionCounter(oldSize) } } -func (mm *mapMetric) LogFileCounter(newSize uint32) { +func (mm *mapMetric) LogFileCounter(newSize Size) { if mm == nil { return } atomic.AddUint32(&mm.FileCounter, 1) atomic.AddUint64(&mm.FileByteCounter, uint64(newSize)) } -func (mm *mapMetric) LogDeletionCounter(oldSize uint32) { +func (mm *mapMetric) LogDeletionCounter(oldSize Size) { if mm == nil { return } @@ -97,11 +97,11 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { buf := make([]byte, NeedleIdSize) err = reverseWalkIndexFile(r, func(entryCount int64) { bf = bloom.NewWithEstimates(uint(entryCount), 0.001) - }, func(key NeedleId, offset Offset, size uint32) error { + }, func(key NeedleId, offset Offset, size Size) error { mm.MaybeSetMaxFileKey(key) NeedleIdToBytes(buf, key) - if size != TombstoneFileSize { + if size.IsValid() { mm.FileByteCounter += uint64(size) } @@ -111,7 +111,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { } else { // deleted file mm.DeletionCounter++ - if size != TombstoneFileSize { + if size.IsValid() { // previously already deleted file mm.DeletionByteCounter += uint64(size) } @@ -121,7 +121,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) { return } -func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error { +func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error { fi, err := r.Stat() if err != nil { return fmt.Errorf("file %s stat error: %v", r.Name(), err) diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go index ae2177a30..362659a11 100644 --- a/weed/storage/needle_map_metric_test.go +++ b/weed/storage/needle_map_metric_test.go @@ -15,7 +15,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) { nm := NewCompactNeedleMap(idxFile) for i := 0; i < 10000; i++ { - nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1)) + nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1)) if rand.Float32() < 0.2 { nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0))) } diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go index e6f9258f3..662b90531 100644 --- a/weed/storage/needle_map_sorted_file.go +++ b/weed/storage/needle_map_sorted_file.go @@ -16,18 +16,18 @@ type SortedFileNeedleMap struct { dbFileSize int64 } -func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { - m = &SortedFileNeedleMap{baseFileName: baseFileName} +func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { + m = &SortedFileNeedleMap{baseFileName: indexBaseFileName} m.indexFile = indexFile - fileName := baseFileName + ".sdx" + fileName := indexBaseFileName + ".sdx" if !isSortedFileFresh(fileName, indexFile) { glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name()) - erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx") + erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx") glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name()) } glog.V(1).Infof("Opening %s...", fileName) - if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil { + if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil { return } dbStat, _ := m.dbFile.Stat() @@ -65,7 +65,7 @@ func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue } -func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { +func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error { return os.ErrInvalid } @@ -80,7 +80,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { return err } - if size == TombstoneFileSize { + if size.IsDeleted() { return nil } @@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { } func (m *SortedFileNeedleMap) Close() { - m.indexFile.Close() - m.dbFile.Close() + if m.indexFile != nil { + m.indexFile.Close() + } + if m.dbFile != nil { + m.dbFile.Close() + } } func (m *SortedFileNeedleMap) Destroy() error { diff --git a/weed/storage/store.go b/weed/storage/store.go index 512f72ceb..6be15a4c9 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -2,13 +2,17 @@ package storage import ( "fmt" + "path/filepath" + "strings" "sync/atomic" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -18,21 +22,25 @@ const ( MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes ) +type ReadOption struct { + ReadDeleted bool +} + /* * A VolumeServer contains one Store */ type Store struct { MasterAddress string grpcDialOption grpc.DialOption - volumeSizeLimit uint64 //read from the master + volumeSizeLimit uint64 // read from the master Ip string Port int PublicUrl string Locations []*DiskLocation - dataCenter string //optional informaton, overwriting master setting if exists - rack string //optional information, overwriting master setting if exists + dataCenter string // optional informaton, overwriting master setting if exists + rack string // optional information, overwriting master setting if exists connected bool - NeedleMapType NeedleMapType + NeedleMapKind NeedleMapKind NewVolumesChan chan master_pb.VolumeShortInformationMessage DeletedVolumesChan chan master_pb.VolumeShortInformationMessage NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage @@ -44,11 +52,11 @@ func (s *Store) String() (str string) { return } -func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) { - s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind} +func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) { + s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind} s.Locations = make([]*DiskLocation, 0) for i := 0; i < len(dirnames); i++ { - location := NewDiskLocation(dirnames[i], maxVolumeCounts[i]) + location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i]) location.loadExistingVolumes(needleMapKind) s.Locations = append(s.Locations, location) stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i])) @@ -61,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } -func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { +func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error { rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e @@ -70,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap if e != nil { return e } - e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb) + e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType) return e } func (s *Store) DeleteCollection(collection string) (e error) { @@ -92,10 +100,19 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume { } return nil } -func (s *Store) FindFreeLocation() (ret *DiskLocation) { +func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) { max := 0 for _, location := range s.Locations { + if diskType != location.DiskType { + continue + } + if location.isDiskSpaceLow { + continue + } currentFreeCount := location.MaxVolumeCount - location.VolumesLen() + currentFreeCount *= erasure_coding.DataShardsCount + currentFreeCount -= location.EcVolumesLen() + currentFreeCount /= erasure_coding.DataShardsCount if currentFreeCount > max { max = currentFreeCount ret = location @@ -103,14 +120,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } - if location := s.FindFreeLocation(); location != nil { + if location := s.FindFreeLocation(diskType); location != nil { glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", location.Directory, vid, collection, replicaPlacement, ttl) - if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { + if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil { location.SetVolume(vid, volume) glog.V(0).Infof("add volume %d", vid) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ @@ -119,6 +136,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind ReplicaPlacement: uint32(replicaPlacement.Byte()), Version: uint32(volume.Version()), Ttl: ttl.ToUint32(), + DiskType: string(diskType), } return nil } else { @@ -128,64 +146,130 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind return fmt.Errorf("No more free space left") } -func (s *Store) VolumeInfos() []*VolumeInfo { - var stats []*VolumeInfo +func (s *Store) VolumeInfos() (allStats []*VolumeInfo) { for _, location := range s.Locations { - location.volumesLock.RLock() - for k, v := range location.volumes { - s := &VolumeInfo{ - Id: needle.VolumeId(k), - Size: v.ContentSize(), - Collection: v.Collection, - ReplicaPlacement: v.ReplicaPlacement, - Version: v.Version(), - FileCount: int(v.FileCount()), - DeleteCount: int(v.DeletedCount()), - DeletedByteCount: v.DeletedSize(), - ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete, - Ttl: v.Ttl, - CompactRevision: uint32(v.CompactionRevision), - } - s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() - stats = append(stats, s) - } - location.volumesLock.RUnlock() + stats := collectStatsForOneLocation(location) + allStats = append(allStats, stats...) + } + sortVolumeInfos(allStats) + return allStats +} + +func collectStatsForOneLocation(location *DiskLocation) (stats []*VolumeInfo) { + location.volumesLock.RLock() + defer location.volumesLock.RUnlock() + + for k, v := range location.volumes { + s := collectStatForOneVolume(k, v) + stats = append(stats, s) } - sortVolumeInfos(stats) return stats } +func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) { + + s = &VolumeInfo{ + Id: vid, + Collection: v.Collection, + ReplicaPlacement: v.ReplicaPlacement, + Version: v.Version(), + ReadOnly: v.IsReadOnly(), + Ttl: v.Ttl, + CompactRevision: uint32(v.CompactionRevision), + DiskType: v.DiskType().String(), + } + s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() + + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + if v.nm == nil { + return + } + + s.FileCount = v.nm.FileCount() + s.DeleteCount = v.nm.DeletedCount() + s.DeletedByteCount = v.nm.DeletedSize() + s.Size = v.nm.ContentSize() + + return +} + func (s *Store) SetDataCenter(dataCenter string) { s.dataCenter = dataCenter } func (s *Store) SetRack(rack string) { s.rack = rack } +func (s *Store) GetDataCenter() string { + return s.dataCenter +} +func (s *Store) GetRack() string { + return s.rack +} func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { var volumeMessages []*master_pb.VolumeInformationMessage - maxVolumeCount := 0 + maxVolumeCounts := make(map[string]uint32) var maxFileKey NeedleId collectionVolumeSize := make(map[string]uint64) + collectionVolumeReadOnlyCount := make(map[string]map[string]uint8) for _, location := range s.Locations { var deleteVids []needle.VolumeId - maxVolumeCount = maxVolumeCount + location.MaxVolumeCount + maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount) location.volumesLock.RLock() for _, v := range location.volumes { - if maxFileKey < v.MaxFileKey() { - maxFileKey = v.MaxFileKey() + curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage() + if volumeMessage == nil { + continue + } + if maxFileKey < curMaxFileKey { + maxFileKey = curMaxFileKey } - if !v.expired(s.GetVolumeSizeLimit()) { - volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage()) + deleteVolume := false + if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) { + volumeMessages = append(volumeMessages, volumeMessage) } else { if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { deleteVids = append(deleteVids, v.Id) + deleteVolume = true } else { - glog.V(0).Infoln("volume", v.Id, "is expired.") + glog.V(0).Infof("volume %d is expired", v.Id) + } + if v.lastIoError != nil { + deleteVids = append(deleteVids, v.Id) + deleteVolume = true + glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError) + } + } + + if _, exist := collectionVolumeSize[v.Collection]; !exist { + collectionVolumeSize[v.Collection] = 0 + } + if !deleteVolume { + collectionVolumeSize[v.Collection] += volumeMessage.Size + } + + if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist { + collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{ + "IsReadOnly": 0, + "noWriteOrDelete": 0, + "noWriteCanDelete": 0, + "isDiskSpaceLow": 0, + } + } + if !deleteVolume && v.IsReadOnly() { + collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1 + if v.noWriteOrDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1 + } + if v.noWriteCanDelete { + collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1 + } + if v.location.isDiskSpaceLow { + collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1 } } - fileSize, _, _ := v.FileStat() - collectionVolumeSize[v.Collection] += fileSize } location.volumesLock.RUnlock() @@ -193,8 +277,14 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { // delete expired volumes. location.volumesLock.Lock() for _, vid := range deleteVids { - location.deleteVolumeById(vid) - glog.V(0).Infoln("volume", vid, "is deleted.") + found, err := location.deleteVolumeById(vid) + if err == nil { + if found { + glog.V(0).Infof("volume %d is deleted", vid) + } + } else { + glog.Warningf("delete volume %d: %v", vid, err) + } } location.volumesLock.Unlock() } @@ -204,16 +294,22 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size)) } + for col, types := range collectionVolumeReadOnlyCount { + for t, count := range types { + stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count)) + } + } + return &master_pb.Heartbeat{ - Ip: s.Ip, - Port: uint32(s.Port), - PublicUrl: s.PublicUrl, - MaxVolumeCount: uint32(maxVolumeCount), - MaxFileKey: NeedleIdToUint64(maxFileKey), - DataCenter: s.dataCenter, - Rack: s.rack, - Volumes: volumeMessages, - HasNoVolumes: len(volumeMessages) == 0, + Ip: s.Ip, + Port: uint32(s.Port), + PublicUrl: s.PublicUrl, + MaxVolumeCounts: maxVolumeCounts, + MaxFileKey: NeedleIdToUint64(maxFileKey), + DataCenter: s.dataCenter, + Rack: s.rack, + Volumes: volumeMessages, + HasNoVolumes: len(volumeMessages) == 0, } } @@ -224,17 +320,13 @@ func (s *Store) Close() { } } -func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { +func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { - if v.noWriteOrDelete || v.noWriteCanDelete { + if v.IsReadOnly() { err = fmt.Errorf("volume %d is read only", i) return } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) { - _, size, isUnchanged, err = v.writeNeedle(n) - } else { - err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) - } + _, _, isUnchanged, err = v.writeNeedle2(n, fsync) return } glog.V(0).Infoln("volume", i, "not found!") @@ -242,23 +334,19 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin return } -func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) { +func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, error) { if v := s.findVolume(i); v != nil { if v.noWriteOrDelete { return 0, fmt.Errorf("volume %d is read only", i) } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) { - return v.deleteNeedle(n) - } else { - return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) - } + return v.deleteNeedle2(n) } return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port) } -func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) { +func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) { if v := s.findVolume(i); v != nil { - return v.readNeedle(n) + return v.readNeedle(n, readOption) } return 0, fmt.Errorf("volume %d not found", i) } @@ -276,13 +364,26 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error { if v == nil { return fmt.Errorf("volume %d not found", i) } + v.noWriteLock.Lock() v.noWriteOrDelete = true + v.noWriteLock.Unlock() + return nil +} + +func (s *Store) MarkVolumeWritable(i needle.VolumeId) error { + v := s.findVolume(i) + if v == nil { + return fmt.Errorf("volume %d not found", i) + } + v.noWriteLock.Lock() + v.noWriteOrDelete = false + v.noWriteLock.Unlock() return nil } func (s *Store) MountVolume(i needle.VolumeId) error { for _, location := range s.Locations { - if found := location.LoadVolume(i, s.NeedleMapType); found == true { + if found := location.LoadVolume(i, s.NeedleMapKind); found == true { glog.V(0).Infof("mount volume %d", i) v := s.findVolume(i) s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{ @@ -291,6 +392,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } return nil } @@ -310,6 +412,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { @@ -326,7 +429,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error { func (s *Store) DeleteVolume(i needle.VolumeId) error { v := s.findVolume(i) if v == nil { - return nil + return fmt.Errorf("delete volume %d not found on disk", i) } message := master_pb.VolumeShortInformationMessage{ Id: uint32(v.Id), @@ -334,18 +437,46 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), + DiskType: string(v.location.DiskType), } for _, location := range s.Locations { - if error := location.deleteVolumeById(i); error == nil { + if err := location.DeleteVolume(i); err == nil { glog.V(0).Infof("DeleteVolume %d", i) s.DeletedVolumesChan <- message return nil + } else { + glog.Errorf("DeleteVolume %d: %v", i, err) } } return fmt.Errorf("volume %d not found on disk", i) } +func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error { + + for _, location := range s.Locations { + fileInfo, found := location.LocateVolume(i) + if !found { + continue + } + // load, modify, save + baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name())) + vifFile := filepath.Join(location.Directory, baseFileName+".vif") + volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile) + if err != nil { + return fmt.Errorf("volume %d fail to load vif", i) + } + volumeInfo.Replication = replication + err = pb.SaveVolumeInfo(vifFile, volumeInfo) + if err != nil { + return fmt.Errorf("volume %d fail to save vif", i) + } + return nil + } + + return fmt.Errorf("volume %d not found on disk", i) +} + func (s *Store) SetVolumeSizeLimit(x uint64) { atomic.StoreUint64(&s.volumeSizeLimit, x) } @@ -353,3 +484,28 @@ func (s *Store) SetVolumeSizeLimit(x uint64) { func (s *Store) GetVolumeSizeLimit() uint64 { return atomic.LoadUint64(&s.volumeSizeLimit) } + +func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) { + volumeSizeLimit := s.GetVolumeSizeLimit() + if volumeSizeLimit == 0 { + return + } + for _, diskLocation := range s.Locations { + if diskLocation.OriginalMaxVolumeCount == 0 { + currentMaxVolumeCount := diskLocation.MaxVolumeCount + diskStatus := stats.NewDiskStatus(diskLocation.Directory) + unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit) + unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace) + volCount := diskLocation.VolumesLen() + maxVolumeCount := volCount + if unclaimedSpaces > int64(volumeSizeLimit) { + maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1 + } + diskLocation.MaxVolumeCount = maxVolumeCount + glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB", + diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024) + hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount + } + } + return +} diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 27406451f..9702fdd50 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "os" "sort" "sync" "time" @@ -57,8 +58,11 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er Id: uint32(vid), Collection: collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(location.DiskType), } return nil + } else if err == os.ErrNotExist { + continue } else { return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err) } @@ -79,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar Id: uint32(vid), Collection: ecShard.Collection, EcIndexBits: uint32(shardBits.AddShardId(shardId)), + DiskType: string(ecShard.DiskType), } for _, location := range s.Locations { @@ -116,7 +121,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) { } } -func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) { +func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { for _, location := range s.Locations { if localEcVolume, found := location.FindEcVolume(vid); found { @@ -124,24 +129,24 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n if err != nil { return 0, fmt.Errorf("locate in local ec volume: %v", err) } - if size == types.TombstoneFileSize { - return 0, fmt.Errorf("entry %s is deleted", n.Id) + if size.IsDeleted() { + return 0, ErrorDeleted } - glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals) + glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals) if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) } - bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals) + bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) } if isDeleted { - return 0, fmt.Errorf("ec entry %s is deleted", n.Id) + return 0, ErrorDeleted } - err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version) + err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("readbytes: %v", err) } @@ -152,14 +157,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n return 0, fmt.Errorf("ec shard %d not found", vid) } -func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { - if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil { + if err = s.cachedLookupEcShardLocations(ecVolume); err != nil { return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } for i, interval := range intervals { - if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil { + if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil { return nil, isDeleted, e } else { if isDeleted { @@ -175,12 +180,12 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n return } -func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { if _, err = shard.ReadAt(data, actualOffset); err != nil { - glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err) + glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err) return } } else { @@ -190,16 +195,15 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl // try reading directly if hasShardIdLocation { - _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) + _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) if err == nil { return } glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err) - forgetShardId(ecVolume, shardId) } // try reading by recovering from other shards - _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset) + _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) if err == nil { return } @@ -215,7 +219,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha ecVolume.ShardLocationsLock.Unlock() } -func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) { +func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) { shardCount := len(ecVolume.ShardLocations) if shardCount < erasure_coding.DataShardsCount && @@ -234,7 +238,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } - resp, err := masterClient.LookupEcVolume(ctx, req) + resp, err := masterClient.LookupEcVolume(context.Background(), req) if err != nil { return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err) } @@ -258,7 +262,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras return } -func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) @@ -266,7 +270,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ for _, sourceDataNode := range sourceDataNodes { glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) - n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset) + n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset) if err == nil { return } @@ -276,12 +280,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ return } -func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice - shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ VolumeId: uint32(vid), ShardId: uint32(shardId), Offset: offset, @@ -298,7 +302,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode break } if receiveErr != nil { - return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) + return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr) } if resp.IsDeleted { is_deleted = true @@ -316,7 +320,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode return } -func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) @@ -344,7 +348,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty go func(shardId erasure_coding.ShardId, locations []string) { defer wg.Done() data := make([]byte, len(buf)) - nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset) + nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) if readErr != nil { glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr) forgetShardId(ecVolume, shardId) diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index e027d2887..4a75fb20b 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -12,9 +12,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" ) -func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { +func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { - count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n) + count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n) if err != nil { return 0, err @@ -24,7 +24,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin return 0, fmt.Errorf("unexpected cookie %x", cookie) } - if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil { + if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil { return 0, err } @@ -32,7 +32,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin } -func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) @@ -43,13 +43,13 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) hasDeletionSuccess := false - err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId) + err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId) if err == nil { hasDeletionSuccess = true } for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ { - if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil { + if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil { hasDeletionSuccess = true } } @@ -62,7 +62,7 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, } -func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { ecVolume.ShardLocationsLock.RLock() sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId] @@ -74,7 +74,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar for _, sourceDataNode := range sourceDataNodes { glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode) - err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) + err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) if err != nil { return err } @@ -85,12 +85,12 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar } -func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice - _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{ VolumeId: uint32(vid), Collection: collection, FileKey: uint64(needleId), diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index 5dacb71bf..32666a417 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -16,7 +17,11 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { if v := s.findVolume(vid); v != nil { - return v.Compact2(preallocate) // compactionBytePerSecond + s := stats.NewDiskStatus(v.dir) + if int64(s.Free) < preallocate { + return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate) + } + return v.Compact2(preallocate, compactionBytePerSecond) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go index fcccbba7d..a263e6669 100644 --- a/weed/storage/super_block/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -6,9 +6,9 @@ import ( ) type ReplicaPlacement struct { - SameRackCount int - DiffRackCount int - DiffDataCenterCount int + SameRackCount int `json:"node,omitempty"` + DiffRackCount int `json:"rack,omitempty"` + DiffDataCenterCount int `json:"dc,omitempty"` } func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) { @@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) { } func (rp *ReplicaPlacement) Byte() byte { + if rp == nil { + return 0 + } ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount return byte(ret) } diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index 2ebb392db..137b97d7f 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -2,9 +2,9 @@ package types import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util" - "math" "strconv" + + "github.com/chrislusf/seaweedfs/weed/util" ) type Offset struct { @@ -12,6 +12,15 @@ type Offset struct { OffsetLower } +type Size int32 + +func (s Size) IsDeleted() bool { + return s < 0 || s == TombstoneFileSize +} +func (s Size) IsValid() bool { + return s > 0 && s != TombstoneFileSize +} + type OffsetLower struct { b3 byte b2 byte @@ -27,7 +36,7 @@ const ( NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize TimestampSize = 8 // int64 size NeedlePaddingSize = 8 - TombstoneFileSize = math.MaxUint32 + TombstoneFileSize = Size(-1) CookieSize = 4 ) @@ -49,3 +58,11 @@ func ParseCookie(cookieString string) (Cookie, error) { } return Cookie(cookie), nil } + +func BytesToSize(bytes []byte) Size { + return Size(util.BytesToUint32(bytes)) +} + +func SizeToBytes(bytes []byte, size Size) { + util.Uint32toBytes(bytes, uint32(size)) +} diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go index 9acd069d3..5348d5b36 100644 --- a/weed/storage/types/offset_4bytes.go +++ b/weed/storage/types/offset_4bytes.go @@ -11,8 +11,8 @@ type OffsetHigher struct { } const ( - OffsetSize = 4 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 // 32GB + OffsetSize = 4 + MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 // 32GB ) func OffsetToBytes(bytes []byte, offset Offset) { @@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset { return Uint32ToOffset(smaller) } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize) } diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go index f57e4f6d4..b6181fc11 100644 --- a/weed/storage/types/offset_5bytes.go +++ b/weed/storage/types/offset_5bytes.go @@ -11,8 +11,8 @@ type OffsetHigher struct { } const ( - OffsetSize = 4 + 1 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB + OffsetSize = 4 + 1 + MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB ) func OffsetToBytes(bytes []byte, offset Offset) { @@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset { } } -func (offset Offset) ToAcutalOffset() (actualOffset int64) { +func (offset Offset) ToActualOffset() (actualOffset int64) { return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize) } diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go new file mode 100644 index 000000000..c9b87d802 --- /dev/null +++ b/weed/storage/types/volume_disk_type.go @@ -0,0 +1,40 @@ +package types + +import ( + "strings" +) + +type DiskType string + +const ( + HardDriveType DiskType = "" + SsdType = "ssd" +) + +func ToDiskType(vt string) (diskType DiskType) { + vt = strings.ToLower(vt) + diskType = HardDriveType + switch vt { + case "", "hdd": + diskType = HardDriveType + case "ssd": + diskType = SsdType + default: + diskType = DiskType(vt) + } + return +} + +func (diskType DiskType) String() string { + if diskType == "" { + return "" + } + return string(diskType) +} + +func (diskType DiskType) ReadableString() string { + if diskType == "" { + return "hdd" + } + return string(diskType) +} diff --git a/weed/storage/volume.go b/weed/storage/volume.go index acede66bf..e0638d8a8 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -21,20 +21,23 @@ import ( type Volume struct { Id needle.VolumeId dir string + dirIdx string Collection string DataBackend backend.BackendStorageFile nm NeedleMapper - needleMapKind NeedleMapType + needleMapKind NeedleMapKind noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + noWriteLock sync.RWMutex hasRemoteFile bool // if the volume has a remote file MemoryMapMaxSizeMb uint32 super_block.SuperBlock dataFileAccessLock sync.RWMutex - lastModifiedTsSeconds uint64 //unix time in seconds - lastAppendAtNs uint64 //unix time in nanoseconds + asyncRequestsChan chan *needle.AsyncRequest + lastModifiedTsSeconds uint64 // unix time in seconds + lastAppendAtNs uint64 // unix time in nanoseconds lastCompactIndexOffset uint64 lastCompactRevision uint16 @@ -42,18 +45,26 @@ type Volume struct { isCompacting bool volumeInfo *volume_server_pb.VolumeInfo + location *DiskLocation + + lastIoError error } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk - v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb} + v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb, + asyncRequestsChan: make(chan *needle.AsyncRequest, 128)} v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) + v.startWorker() return } + func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) + v.noWriteLock.RLock() + defer v.noWriteLock.RUnlock() + return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -65,10 +76,24 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) { } return } -func (v *Volume) FileName() (fileName string) { + +func (v *Volume) DataFileName() (fileName string) { return VolumeFileName(v.dir, v.Collection, int(v.Id)) } +func (v *Volume) IndexFileName() (fileName string) { + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) +} + +func (v *Volume) FileName(ext string) (fileName string) { + switch ext { + case ".idx", ".cpx", ".ldb": + return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext + } + // .dat, .cpd, .vif + return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext +} + func (v *Volume) Version() needle.Version { if v.volumeInfo.Version != 0 { v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) @@ -146,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 { return v.nm.IndexFileSize() } +func (v *Volume) DiskType() types.DiskType { + return v.location.DiskType +} + // Close cleanly shuts down this volume func (v *Volume) Close() { v.dataFileAccessLock.Lock() @@ -169,20 +198,20 @@ func (v *Volume) NeedToReplicate() bool { // except when volume is empty // or when the volume does not have a ttl // or when volumeSizeLimit is 0 when server just starts -func (v *Volume) expired(volumeSizeLimit uint64) bool { +func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool { if volumeSizeLimit == 0 { - //skip if we don't know size limit + // skip if we don't know size limit return false } - if v.ContentSize() == 0 { + if contentSize <= super_block.SuperBlockSize { return false } if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) + glog.V(2).Infof("volume %d now:%v lastModified:%v", v.Id, time.Now().Unix(), v.lastModifiedTsSeconds) livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60 - glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(2).Infof("volume %d ttl:%v lived:%v", v.Id, v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true } @@ -205,27 +234,54 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { return false } -func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { - size, _, modTime := v.FileStat() +func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + glog.V(3).Infof("collectStatus volume %d", v.Id) - volumInfo := &master_pb.VolumeInformationMessage{ + if v.nm == nil { + return + } + + ok = true + + maxFileKey = v.nm.MaxFileKey() + datFileSize, modTime, _ = v.DataBackend.GetStat() + fileCount = uint64(v.nm.FileCount()) + deletedCount = uint64(v.nm.DeletedCount()) + deletedSize = v.nm.DeletedSize() + fileCount = uint64(v.nm.FileCount()) + + return +} + +func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) { + + maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus() + + if !ok { + return 0, nil + } + + volumeInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), - Size: size, + Size: uint64(volumeSize), Collection: v.Collection, - FileCount: v.FileCount(), - DeleteCount: v.DeletedCount(), - DeletedByteCount: v.DeletedSize(), - ReadOnly: v.noWriteOrDelete, + FileCount: fileCount, + DeleteCount: deletedCount, + DeletedByteCount: deletedSize, + ReadOnly: v.IsReadOnly(), ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), + DiskType: string(v.location.DiskType), } - volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey() - return volumInfo + return maxFileKey, volumeInfo } func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { @@ -237,3 +293,9 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { } return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey() } + +func (v *Volume) IsReadOnly() bool { + v.noWriteLock.RLock() + defer v.noWriteLock.RUnlock() + return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow +} diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index ec29c895e..82ea12a89 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -64,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.) func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error { - ctx := context.Background() - startFromOffset, _, _ := v.FileStat() appendAtNs, err := v.findLastAppendAtNs() if err != nil { @@ -76,7 +74,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ + stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), SinceNs: appendAtNs, }) @@ -126,9 +124,9 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) { } func (v *Volume) locateLastAppendEntry() (Offset, error) { - indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) + indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644) if e != nil { - return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e) + return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e) } defer indexFile.Close() @@ -156,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()) if err != nil { - return 0, fmt.Errorf("ReadNeedleHeader: %v", err) + return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err) } - _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength) if err != nil { - return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) + return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err) } return n.AppendAtNs, nil @@ -170,25 +168,13 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { // on server side func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) { - indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) - if openErr != nil { - err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr) - return - } - defer indexFile.Close() - fi, statErr := indexFile.Stat() - if statErr != nil { - err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr) - return - } - fileSize := fi.Size() + fileSize := int64(v.IndexFileSize()) if fileSize%NeedleMapEntrySize != 0 { - err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) + err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize) return } - bytes := make([]byte, NeedleMapEntrySize) entryCount := fileSize / NeedleMapEntrySize l := int64(0) h := entryCount @@ -202,7 +188,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast } // read the appendAtNs for entry m - offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m) + offset, err = v.readOffsetFromIndex(m) if err != nil { return } @@ -226,19 +212,21 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast return Offset{}, true, nil } - offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l) + offset, err = v.readOffsetFromIndex(l) return offset, false, err } // bytes is of size NeedleMapEntrySize -func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) { - if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF { - return Offset{}, readErr +func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + if v.nm == nil { + return Offset{}, io.EOF } - _, offset, _ := idx.IdxFileEntry(bytes) - return offset, nil + _, offset, _, err := v.nm.ReadIndexEntry(m) + return offset, err } // generate the volume idx @@ -255,7 +243,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool { } func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error { - if n.Size > 0 && n.Size != TombstoneFileSize { + if n.Size > 0 && n.Size.IsValid() { return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) } return scanner.v.nm.Delete(n.Id, ToOffset(offset)) diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index a65c2a3ff..b76933083 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -2,8 +2,11 @@ package storage import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "io" "os" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -11,29 +14,56 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) { +func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) { var indexSize int64 - if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil { - return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e) + if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil { + return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err) } if indexSize == 0 { return 0, nil } + healthyIndexSize := indexSize + for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ { + // check and fix last 10 entries + lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize) + if err == io.EOF { + healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize + continue + } + if err != ErrorSizeMismatch { + break + } + } + if healthyIndexSize < indexSize { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize) + err = indexFile.Truncate(healthyIndexSize) + if err != nil { + glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err) + } + } + return +} + +func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) { var lastIdxEntry []byte - if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil { - return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) + if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil { + return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err) } key, offset, size := idx.IdxFileEntry(lastIdxEntry) if offset.IsZero() { return 0, nil } - if size == TombstoneFileSize { - size = 0 - } - if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { - return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) + if size < 0 { + // read the deletion entry + if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil { + return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err) + } + } else { + if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil { + return lastAppendAtNs, err + } } - return + return lastAppendAtNs, nil } func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) { @@ -55,13 +85,82 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } -func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { - n := new(needle.Needle) +func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) { + n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset) + } + if n.Size != size { + return 0, ErrorSizeMismatch + } + if v == needle.Version3 { + bytes := make([]byte, TimestampSize) + _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize) + if err == io.EOF { + return 0, err + } + if err != nil { + return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err) + } + n.AppendAtNs = util.BytesToUint64(bytes) + fileTailOffset := offset + needle.GetActualSize(size, v) + fileSize, _, err := datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err) + } + if fileSize == fileTailOffset { + return n.AppendAtNs, nil + } + if fileSize > fileTailOffset { + glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset) + err = datFile.Truncate(fileTailOffset) + if err == nil { + return n.AppendAtNs, nil + } + return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err) + } + glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset) + } if err = n.ReadData(datFile, offset, size, v); err != nil { - return n.AppendAtNs, err + return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err) + } + if n.Id != key { + return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) + } + return n.AppendAtNs, err +} + +func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) { + n := new(needle.Needle) + size := n.DiskSize(v) + var fileSize int64 + fileSize, _, err = datFile.GetStat() + if err != nil { + return 0, fmt.Errorf("GetStat: %v", err) + } + if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil { + return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err) } if n.Id != key { return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id) } return n.AppendAtNs, err } + +func (v *Volume) checkIdxFile() error { + datFileSize, _, err := v.DataBackend.GetStat() + if err != nil { + return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err) + } + if datFileSize <= super_block.SuperBlockSize { + return nil + } + indexFileName := v.FileName(".idx") + if util.FileExists(indexFileName) { + return nil + } + return fmt.Errorf("idx file %s does not exists", indexFileName) +} diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go deleted file mode 100644 index ffcb246a4..000000000 --- a/weed/storage/volume_create.go +++ /dev/null @@ -1,21 +0,0 @@ -// +build !linux,!windows - -package storage - -import ( - "os" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend" -) - -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { - file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) - if e != nil { - return nil, e - } - if preallocate > 0 { - glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) - } - return backend.NewDiskFile(file), nil -} diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index 313818cde..9c64c9682 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -14,6 +14,7 @@ type VolumeInfo struct { Size uint64 ReplicaPlacement *super_block.ReplicaPlacement Ttl *needle.TTL + DiskType string Collection string Version needle.Version FileCount int @@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er ModifiedAtSecond: m.ModifiedAtSecond, RemoteStorageName: m.RemoteStorageName, RemoteStorageKey: m.RemoteStorageKey, + DiskType: m.DiskType, } rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { @@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu } vi.ReplicaPlacement = rp vi.Ttl = needle.LoadTTLFromUint32(m.Ttl) + vi.DiskType = m.DiskType return vi, nil } @@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe ModifiedAtSecond: vi.ModifiedAtSecond, RemoteStorageName: vi.RemoteStorageName, RemoteStorageKey: vi.RemoteStorageKey, + DiskType: vi.DiskType, } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index fa1f7d617..0cf603ad8 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -14,7 +14,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) { v = &Volume{dir: dirname, Collection: collection, Id: id} v.SuperBlock = super_block.SuperBlock{} v.needleMapKind = needleMapKind @@ -22,31 +22,42 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) { - fileName := v.FileName() +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) { alreadyHasSuperBlock := false - if !v.maybeLoadVolumeInfo() { - v.SaveVolumeInfo() - } + hasLoadedVolume := false + defer func() { + if !hasLoadedVolume { + if v.nm != nil { + v.nm.Close() + v.nm = nil + } + if v.DataBackend != nil { + v.DataBackend.Close() + v.DataBackend = nil + } + } + }() + + hasVolumeInfoFile := v.maybeLoadVolumeInfo() if v.HasRemoteFile() { v.noWriteCanDelete = true v.noWriteOrDelete = false - glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files) + glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo) v.LoadRemoteFile() alreadyHasSuperBlock = true - } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists { + } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists { // open dat file if !canRead { - return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) + return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat")) } var dataFile *os.File if canWrite { - dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644) } else { - glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - dataFile, err = os.Open(fileName + ".dat") + glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat")) + dataFile, err = os.Open(v.FileName(".dat")) v.noWriteOrDelete = true } v.lastModifiedTsSeconds = uint64(modifiedTime.Unix()) @@ -56,92 +67,117 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind v.DataBackend = backend.NewDiskFile(dataFile) } else { if createDatIfMissing { - v.DataBackend, err = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) + v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb) } else { - return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) + return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat")) } } if err != nil { if !os.IsPermission(err) { - return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err) + return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err) } else { - return fmt.Errorf("load data file %s.dat: %v", fileName, err) + return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err) } } if alreadyHasSuperBlock { err = v.readSuperBlock() + glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version) + if v.HasRemoteFile() { + // maybe temporary network problem + glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err) + err = nil + } } else { if !v.SuperBlock.Initialized() { - return fmt.Errorf("volume %s.dat not initialized", fileName) + return fmt.Errorf("volume %s not initialized", v.FileName(".dat")) } err = v.maybeWriteSuperBlock() } if err == nil && alsoLoadIndex { + // adjust for existing volumes with .idx together with .dat files + if v.dirIdx != v.dir { + if util.FileExists(v.DataFileName() + ".idx") { + v.dirIdx = v.dir + } + } + // check volume idx files + if err := v.checkIdxFile(); err != nil { + glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err) + } var indexFile *os.File if v.noWriteOrDelete { - glog.V(0).Infoln("open to read file", fileName+".idx") - if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil { - return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err) + glog.V(0).Infoln("open to read file", v.FileName(".idx")) + if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil { + return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err) } } else { - glog.V(1).Infoln("open to write file", fileName+".idx") - if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { - return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) + glog.V(1).Infoln("open to write file", v.FileName(".idx")) + if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil { + return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err) } } - if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { + if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil { v.noWriteOrDelete = true glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) } if v.noWriteOrDelete || v.noWriteCanDelete { - if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil { - glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err) + if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil { + glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err) } } else { switch needleMapKind { case NeedleMapInMemory: - glog.V(0).Infoln("loading index", fileName+".idx", "to memory") + glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory") if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { - glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err) + glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err) } case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb", fileName+".ldb") + glog.V(0).Infoln("loading leveldb", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } case NeedleMapLevelDbMedium: - glog.V(0).Infoln("loading leveldb medium", fileName+".ldb") + glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } case NeedleMapLevelDbLarge: - glog.V(0).Infoln("loading leveldb large", fileName+".ldb") + glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb")) opts := &opt.Options{ BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB CompactionTableSizeMultiplier: 10, // default value is 1 } - if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err) } } } } + if !hasVolumeInfoFile { + v.volumeInfo.Version = uint32(v.SuperBlock.Version) + v.SaveVolumeInfo() + } + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + if err == nil { + hasLoadedVolume = true + } + return err } diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go new file mode 100644 index 000000000..f689eeec0 --- /dev/null +++ b/weed/storage/volume_read.go @@ -0,0 +1,131 @@ +package storage + +import ( + "fmt" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return -1, ErrorNotFound + } + readSize := nv.Size + if readSize.IsDeleted() { + if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize { + glog.V(3).Infof("reading deleted %s", n.String()) + readSize = -readSize + } else { + return -1, ErrorDeleted + } + } + if readSize == 0 { + return 0, nil + } + err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version()) + if err == needle.ErrorSizeMismatch && OffsetSize == 4 { + err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version()) + } + v.checkReadWriteError(err) + if err != nil { + return 0, err + } + bytesRead := len(n.Data) + if !n.HasTtl() { + return bytesRead, nil + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) { + return bytesRead, nil + } + return -1, ErrorNotFound +} + +// read fills in Needle content by looking up n.Id from NeedleMapper +func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) { + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() + + return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version()) +} + +type VolumeFileScanner interface { + VisitSuperBlock(super_block.SuperBlock) error + ReadNeedleBody() bool + VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error +} + +func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, + needleMapKind NeedleMapKind, + volumeFileScanner VolumeFileScanner) (err error) { + var v *Volume + if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { + return fmt.Errorf("failed to load volume %d: %v", id, err) + } + if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { + return fmt.Errorf("failed to process volume %d super block: %v", id, err) + } + defer v.Close() + + version := v.Version() + + offset := int64(v.SuperBlock.BlockSize()) + + return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) +} + +func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { + n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) + if e != nil { + if e == io.EOF { + return nil + } + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) + } + for n != nil { + var needleBody []byte + if volumeFileScanner.ReadNeedleBody() { + // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest) + if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { + glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err) + // err = fmt.Errorf("cannot read needle body: %v", err) + // return + } + } + err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) + if err == io.EOF { + return nil + } + if err != nil { + glog.V(0).Infof("visit needle error: %v", err) + return fmt.Errorf("visit needle error: %v", err) + } + offset += NeedleHeaderSize + rest + glog.V(4).Infof("==> new entry offset %d", offset) + if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { + if err == io.EOF { + return nil + } + return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) + } + glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) + } + return nil +} diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go deleted file mode 100644 index 0aa3f794b..000000000 --- a/weed/storage/volume_read_write.go +++ /dev/null @@ -1,237 +0,0 @@ -package storage - -import ( - "bytes" - "errors" - "fmt" - "io" - "os" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/super_block" - . "github.com/chrislusf/seaweedfs/weed/storage/types" -) - -var ErrorNotFound = errors.New("not found") - -// isFileUnchanged checks whether this needle to write is same as last one. -// It requires serialized access in the same volume. -func (v *Volume) isFileUnchanged(n *needle.Needle) bool { - if v.Ttl.String() != "" { - return false - } - - nv, ok := v.nm.Get(n.Id) - if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize { - oldNeedle := new(needle.Needle) - err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) - if err != nil { - glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err) - return false - } - if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { - n.DataSize = oldNeedle.DataSize - return true - } - } - return false -} - -// Destroy removes everything related to this volume -func (v *Volume) Destroy() (err error) { - if v.isCompacting { - err = fmt.Errorf("volume %d is compacting", v.Id) - return - } - storageName, storageKey := v.RemoteStorageNameKey() - if v.HasRemoteFile() && storageName != "" && storageKey != "" { - if backendStorage, found := backend.BackendStorages[storageName]; found { - backendStorage.DeleteFile(storageKey) - } - } - v.Close() - os.Remove(v.FileName() + ".dat") - os.Remove(v.FileName() + ".idx") - os.Remove(v.FileName() + ".vif") - os.Remove(v.FileName() + ".sdx") - os.Remove(v.FileName() + ".cpd") - os.Remove(v.FileName() + ".cpx") - os.RemoveAll(v.FileName() + ".ldb") - return -} - -func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { - glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - if v.isFileUnchanged(n) { - size = n.DataSize - isUnchanged = true - return - } - - if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL { - n.SetHasTtl() - n.Ttl = v.Ttl - } - - // check whether existing needle cookie matches - nv, ok := v.nm.Get(n.Id) - if ok { - existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset()) - if existingNeedleReadErr != nil { - err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) - return - } - if existingNeedle.Cookie != n.Cookie { - glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie) - err = fmt.Errorf("mismatching cookie %x", n.Cookie) - return - } - } - - // append to dat file - n.AppendAtNs = uint64(time.Now().UnixNano()) - if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil { - return - } - v.lastAppendAtNs = n.AppendAtNs - - // add to needle map - if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { - if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { - glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) - } - } - if v.lastModifiedTsSeconds < n.LastModified { - v.lastModifiedTsSeconds = n.LastModified - } - return -} - -func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { - glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() - nv, ok := v.nm.Get(n.Id) - //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) - if ok && nv.Size != TombstoneFileSize { - size := nv.Size - n.Data = nil - n.AppendAtNs = uint64(time.Now().UnixNano()) - offset, _, _, err := n.Append(v.DataBackend, v.Version()) - if err != nil { - return size, err - } - v.lastAppendAtNs = n.AppendAtNs - if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { - return size, err - } - return size, err - } - return 0, nil -} - -// read fills in Needle content by looking up n.Id from NeedleMapper -func (v *Volume) readNeedle(n *needle.Needle) (int, error) { - v.dataFileAccessLock.RLock() - defer v.dataFileAccessLock.RUnlock() - - nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset.IsZero() { - return -1, ErrorNotFound - } - if nv.Size == TombstoneFileSize { - return -1, errors.New("already deleted") - } - if nv.Size == 0 { - return 0, nil - } - err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) - if err != nil { - return 0, err - } - bytesRead := len(n.Data) - if !n.HasTtl() { - return bytesRead, nil - } - ttlMinutes := n.Ttl.Minutes() - if ttlMinutes == 0 { - return bytesRead, nil - } - if !n.HasLastModifiedDate() { - return bytesRead, nil - } - if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { - return bytesRead, nil - } - return -1, ErrorNotFound -} - -type VolumeFileScanner interface { - VisitSuperBlock(super_block.SuperBlock) error - ReadNeedleBody() bool - VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error -} - -func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, - needleMapKind NeedleMapType, - volumeFileScanner VolumeFileScanner) (err error) { - var v *Volume - if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { - return fmt.Errorf("failed to load volume %d: %v", id, err) - } - if v.volumeInfo.Version == 0 { - if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { - return fmt.Errorf("failed to process volume %d super block: %v", id, err) - } - } - defer v.Close() - - version := v.Version() - - offset := int64(v.SuperBlock.BlockSize()) - - return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) -} - -func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { - n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) - if e != nil { - if e == io.EOF { - return nil - } - return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) - } - for n != nil { - var needleBody []byte - if volumeFileScanner.ReadNeedleBody() { - if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil { - glog.V(0).Infof("cannot read needle body: %v", err) - //err = fmt.Errorf("cannot read needle body: %v", err) - //return - } - } - err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody) - if err == io.EOF { - return nil - } - if err != nil { - glog.V(0).Infof("visit needle error: %v", err) - return fmt.Errorf("visit needle error: %v", err) - } - offset += NeedleHeaderSize + rest - glog.V(4).Infof("==> new entry offset %d", offset) - if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil { - if err == io.EOF { - return nil - } - return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err) - } - glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest) - } - return nil -} diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go new file mode 100644 index 000000000..d229bdf20 --- /dev/null +++ b/weed/storage/volume_stream_write.go @@ -0,0 +1,104 @@ +package storage + +import ( + "bufio" + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" + "io" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + df, ok := v.DataBackend.(*backend.DiskFile) + if !ok { + return fmt.Errorf("unexpected volume backend") + } + offset, _, _ := v.DataBackend.GetStat() + + header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation + CookieToBytes(header[0:CookieSize], n.Cookie) + NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id) + n.Size = 4 + Size(dataSize) + 1 + SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size) + + n.DataSize = dataSize + + // needle header + df.Write(header[0:NeedleHeaderSize]) + + // data size and data + util.Uint32toBytes(header[0:4], n.DataSize) + df.Write(header[0:4]) + // write and calculate CRC + crcWriter := needle.NewCRCwriter(df) + io.Copy(crcWriter, io.LimitReader(data, int64(dataSize))) + + // flags + util.Uint8toBytes(header[0:1], n.Flags) + df.Write(header[0:1]) + + // data checksum + util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum()) + // write timestamp, padding + n.AppendAtNs = uint64(time.Now().UnixNano()) + util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs) + padding := needle.PaddingLength(n.Size, needle.Version3) + df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding]) + + // add to needle map + if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) + } + return +} + +func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + nv, ok := v.nm.Get(n.Id) + if !ok || nv.Offset.IsZero() { + return ErrorNotFound + } + + sr := &StreamReader{ + readerAt: v.DataBackend, + offset: nv.Offset.ToActualOffset(), + } + bufReader := bufio.NewReader(sr) + bufReader.Discard(NeedleHeaderSize) + sizeBuf := make([]byte, 4) + bufReader.Read(sizeBuf) + if _, err = writer.Write(sizeBuf); err != nil { + return err + } + dataSize := util.BytesToUint32(sizeBuf) + + _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize))) + + return +} + +type StreamReader struct { + offset int64 + readerAt io.ReaderAt +} + +func (sr *StreamReader) Read(p []byte) (n int, err error) { + n, err = sr.readerAt.ReadAt(p, sr.offset) + if err != nil { + return + } + sr.offset += int64(n) + return +} diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index 61c09d85a..20223ac1b 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "os" "github.com/chrislusf/seaweedfs/weed/glog" @@ -25,8 +26,10 @@ func (v *Volume) maybeWriteSuperBlock() error { if dataFile, e = os.Create(v.DataBackend.Name()); e == nil { v.DataBackend = backend.NewDiskFile(dataFile) if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil { + v.noWriteLock.Lock() v.noWriteOrDelete = false v.noWriteCanDelete = false + v.noWriteLock.Unlock() } } } @@ -36,5 +39,12 @@ func (v *Volume) maybeWriteSuperBlock() error { func (v *Volume) readSuperBlock() (err error) { v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend) + if v.volumeInfo != nil && v.volumeInfo.Replication != "" { + if replication, err := super_block.NewReplicaPlacementFromString(v.volumeInfo.Replication); err != nil { + return fmt.Errorf("Error parse volume %d replication %s : %v", v.Id, v.volumeInfo.Replication, err) + } else { + v.SuperBlock.ReplicaPlacement = replication + } + } return err } diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go index 99071285f..23160906b 100644 --- a/weed/storage/volume_tier.go +++ b/weed/storage/volume_tier.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" ) func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { @@ -14,12 +15,21 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { func (v *Volume) maybeLoadVolumeInfo() (found bool) { - v.volumeInfo, found = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif") + var err error + v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif")) - if found { + if v.volumeInfo.Version == 0 { + v.volumeInfo.Version = uint32(needle.CurrentVersion) + } + + if v.hasRemoteFile { glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id, v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key) - v.hasRemoteFile = true + } + + if err != nil { + glog.Warningf("load volume %d.vif file: %v", v.Id, err) + return } return @@ -44,7 +54,7 @@ func (v *Volume) LoadRemoteFile() error { func (v *Volume) SaveVolumeInfo() error { - tierFileName := v.FileName() + ".vif" + tierFileName := v.FileName(".vif") return pb.SaveVolumeInfo(tierFileName, v.volumeInfo) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 434b5989d..be84f8a13 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "os" + "runtime" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -48,15 +49,20 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error v.isCompacting = false }() - filePath := v.FileName() v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) + if err := v.DataBackend.Sync(); err != nil { + glog.V(0).Infof("compact fail to sync volume %d", v.Id) + } + if err := v.nm.Sync(); err != nil { + glog.V(0).Infof("compact fail to sync volume idx %d", v.Id) + } + return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond) } // compact a volume based on deletions in .idx files -func (v *Volume) Compact2(preallocate int64) error { +func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error { if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil @@ -68,11 +74,16 @@ func (v *Volume) Compact2(preallocate int64) error { v.isCompacting = false }() - filePath := v.FileName() v.lastCompactIndexOffset = v.IndexFileSize() v.lastCompactRevision = v.SuperBlock.CompactionRevision glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate) + if err := v.DataBackend.Sync(); err != nil { + glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err) + } + if err := v.nm.Sync(); err != nil { + glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err) + } + return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond) } func (v *Volume) CommitCompact() error { @@ -91,38 +102,49 @@ func (v *Volume) CommitCompact() error { glog.V(3).Infof("Got volume %d committing lock...", v.Id) v.nm.Close() - if err := v.DataBackend.Close(); err != nil { - glog.V(0).Infof("fail to close volume %d", v.Id) + if v.DataBackend != nil { + if err := v.DataBackend.Close(); err != nil { + glog.V(0).Infof("fail to close volume %d", v.Id) + } } v.DataBackend = nil stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() var e error - if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil { glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) - e = os.Remove(v.FileName() + ".cpd") + e = os.Remove(v.FileName(".cpd")) if e != nil { return e } - e = os.Remove(v.FileName() + ".cpx") + e = os.Remove(v.FileName(".cpx")) if e != nil { return e } } else { + if runtime.GOOS == "windows" { + e = os.RemoveAll(v.FileName(".dat")) + if e != nil { + return e + } + e = os.RemoveAll(v.FileName(".idx")) + if e != nil { + return e + } + } var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) + if e = os.Rename(v.FileName(".cpd"), v.FileName(".dat")); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName(".cpd"), e) } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) + if e = os.Rename(v.FileName(".cpx"), v.FileName(".idx")); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName(".cpx"), e) } } //glog.V(3).Infof("Pretending to be vacuuming...") //time.Sleep(20 * time.Second) - os.RemoveAll(v.FileName() + ".ldb") - os.RemoveAll(v.FileName() + ".bdb") + os.RemoveAll(v.FileName(".ldb")) glog.V(3).Infof("Loading volume %d commit file...", v.Id) if e = v.load(true, false, v.needleMapKind, 0); e != nil { @@ -134,8 +156,8 @@ func (v *Volume) CommitCompact() error { func (v *Volume) cleanupCompact() error { glog.V(0).Infof("Cleaning up volume %d vacuuming...", v.Id) - e1 := os.Remove(v.FileName() + ".cpd") - e2 := os.Remove(v.FileName() + ".cpx") + e1 := os.Remove(v.FileName(".cpd")) + e2 := os.Remove(v.FileName(".cpx")) if e1 != nil { return e1 } @@ -158,9 +180,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI var indexSize int64 oldIdxFile, err := os.Open(oldIdxFileName) + if err != nil { + return fmt.Errorf("makeupDiff open %s failed: %v", oldIdxFileName, err) + } defer oldIdxFile.Close() oldDatFile, err := os.Open(oldDatFileName) + if err != nil { + return fmt.Errorf("makeupDiff open %s failed: %v", oldDatFileName, err) + } oldDatBackend := backend.NewDiskFile(oldDatFile) defer oldDatBackend.Close() @@ -183,7 +211,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI type keyField struct { offset Offset - size uint32 + size Size } incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField) @@ -250,15 +278,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } //updated needle - if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize { + if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() { //even the needle cache in memory is hit, the need_bytes is correct - glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) + glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) + needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version()) if err != nil { - return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) + return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err) } - dst.Write(needleBytes) + dstDatBackend.Write(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) } else { //deleted needle //fakeDelNeedle 's default Data field is nil @@ -311,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in } nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { + if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() { if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } @@ -330,12 +358,13 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca var ( dst backend.BackendStorageFile ) - if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { + if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil { return } defer dst.Close() nm := needle_map.NewMemDb() + defer nm.Close() scanner := &VolumeFileScanner4Vacuum{ v: v, @@ -353,64 +382,70 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca return } -func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) { var ( - dstDatBackend backend.BackendStorageFile - oldIndexFile *os.File + srcDatBackend, dstDatBackend backend.BackendStorageFile + dataFile *os.File ) - if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil { + if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil { return } defer dstDatBackend.Close() - if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { + oldNm := needle_map.NewMemDb() + defer oldNm.Close() + newNm := needle_map.NewMemDb() + defer newNm.Close() + if err = oldNm.LoadFromIdx(srcIdxName); err != nil { return } - defer oldIndexFile.Close() - - nm := needle_map.NewMemDb() + if dataFile, err = os.Open(srcDatName); err != nil { + return err + } + srcDatBackend = backend.NewDiskFile(dataFile) + defer srcDatBackend.Close() now := uint64(time.Now().Unix()) - v.SuperBlock.CompactionRevision++ - dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0) - newOffset := int64(v.SuperBlock.BlockSize()) + sb.CompactionRevision++ + dstDatBackend.WriteAt(sb.Bytes(), 0) + newOffset := int64(sb.BlockSize()) - idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset.IsZero() || size == TombstoneFileSize { - return nil - } + writeThrottler := util.NewWriteThrottler(compactionBytePerSecond) + + oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { + + offset, size := value.Offset, value.Size - nv, ok := v.nm.Get(key) - if !ok { + if offset.IsZero() || size.IsDeleted() { return nil } n := new(needle.Needle) - err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version()) + err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version) if err != nil { return nil } - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) { return nil } - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if nv.Offset == offset && nv.Size > 0 { - if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - newOffset += n.DiskSize(v.Version()) - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) } + if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { + return fmt.Errorf("cannot append needle: %s", err) + } + delta := n.DiskSize(version) + newOffset += delta + writeThrottler.MaybeSlowdown(delta) + glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + return nil }) - nm.SaveToIdx(idxName) + newNm.SaveToIdx(datIdxName) return } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index 95f43d6ec..cd5a4f430 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -69,7 +69,7 @@ func TestCompaction(t *testing.T) { } defer os.RemoveAll(dir) // clean up - v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) { } startTime := time.Now() - v.Compact2(0) + v.Compact2(0, 0) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed) @@ -96,7 +96,7 @@ func TestCompaction(t *testing.T) { v.Close() - v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) + v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0) if err != nil { t.Fatalf("volume reloading: %v", err) } @@ -113,11 +113,11 @@ func TestCompaction(t *testing.T) { } n := newEmptyNeedle(uint64(i)) - size, err := v.readNeedle(n) + size, err := v.readNeedle(n, nil) if err != nil { t.Fatalf("read file %d: %v", i, err) } - if infos[i-1].size != uint32(size) { + if infos[i-1].size != types.Size(size) { t.Fatalf("read file %d size mismatch expected %d found %d", i, infos[i-1].size, size) } if infos[i-1].crc != n.Checksum { @@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) { } func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { n := newRandomNeedle(uint64(i)) - _, size, _, err := v.writeNeedle(n) + _, size, _, err := v.writeNeedle2(n, false) if err != nil { t.Fatalf("write file %d: %v", i, err) } @@ -141,7 +141,7 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { if rand.Float64() < 0.03 { toBeDeleted := rand.Intn(i) + 1 oldNeedle := newEmptyNeedle(uint64(toBeDeleted)) - v.deleteNeedle(oldNeedle) + v.deleteNeedle2(oldNeedle) // println("deleted file", toBeDeleted) infos[toBeDeleted-1] = &needleInfo{ size: 0, @@ -151,7 +151,7 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) { } type needleInfo struct { - size uint32 + size types.Size crc needle.CRC } diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go new file mode 100644 index 000000000..a286c5dd5 --- /dev/null +++ b/weed/storage/volume_write.go @@ -0,0 +1,327 @@ +package storage + +import ( + "bytes" + "errors" + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +var ErrorNotFound = errors.New("not found") +var ErrorDeleted = errors.New("already deleted") +var ErrorSizeMismatch = errors.New("size mismatch") + +func (v *Volume) checkReadWriteError(err error) { + if err == nil { + if v.lastIoError != nil { + v.lastIoError = nil + } + return + } + if err.Error() == "input/output error" { + v.lastIoError = err + } +} + +// isFileUnchanged checks whether this needle to write is same as last one. +// It requires serialized access in the same volume. +func (v *Volume) isFileUnchanged(n *needle.Needle) bool { + if v.Ttl.String() != "" { + return false + } + + nv, ok := v.nm.Get(n.Id) + if ok && !nv.Offset.IsZero() && nv.Size.IsValid() { + oldNeedle := new(needle.Needle) + err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version()) + if err != nil { + glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err) + return false + } + if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) { + n.DataSize = oldNeedle.DataSize + return true + } + } + return false +} + +// Destroy removes everything related to this volume +func (v *Volume) Destroy() (err error) { + if v.isCompacting { + err = fmt.Errorf("volume %d is compacting", v.Id) + return + } + close(v.asyncRequestsChan) + storageName, storageKey := v.RemoteStorageNameKey() + if v.HasRemoteFile() && storageName != "" && storageKey != "" { + if backendStorage, found := backend.BackendStorages[storageName]; found { + backendStorage.DeleteFile(storageKey) + } + } + v.Close() + removeVolumeFiles(v.DataFileName()) + removeVolumeFiles(v.IndexFileName()) + return +} + +func removeVolumeFiles(filename string) { + // basic + os.Remove(filename + ".dat") + os.Remove(filename + ".idx") + os.Remove(filename + ".vif") + // sorted index file + os.Remove(filename + ".sdx") + // compaction + os.Remove(filename + ".cpd") + os.Remove(filename + ".cpx") + // level db indx file + os.RemoveAll(filename + ".ldb") + // marker for damaged or incomplete volume + os.Remove(filename + ".note") +} + +func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) { + v.asyncRequestsChan <- request +} + +func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) { + // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version()) + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { + err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + return + } + + return v.doWriteRequest(n) +} + +func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) { + // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL { + n.SetHasTtl() + n.Ttl = v.Ttl + } + + if !fsync { + return v.syncWrite(n) + } else { + asyncRequest := needle.NewAsyncRequest(n, true) + // using len(n.Data) here instead of n.Size before n.Size is populated in n.Append() + asyncRequest.ActualSize = needle.GetActualSize(Size(len(n.Data)), v.Version()) + + v.asyncRequestAppend(asyncRequest) + offset, _, isUnchanged, err = asyncRequest.WaitComplete() + + return + } +} + +func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) { + // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + if v.isFileUnchanged(n) { + size = Size(n.DataSize) + isUnchanged = true + return + } + + // check whether existing needle cookie matches + nv, ok := v.nm.Get(n.Id) + if ok { + existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset()) + if existingNeedleReadErr != nil { + err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr) + return + } + if existingNeedle.Cookie != n.Cookie { + glog.V(0).Infof("write cookie mismatch: existing %s, new %s", + needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n)) + err = fmt.Errorf("mismatching cookie %x", n.Cookie) + return + } + } + + // append to dat file + n.AppendAtNs = uint64(time.Now().UnixNano()) + offset, size, _, err = n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) + if err != nil { + return + } + v.lastAppendAtNs = n.AppendAtNs + + // add to needle map + if !ok || uint64(nv.Offset.ToActualOffset()) < offset { + if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { + glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) + } + } + if v.lastModifiedTsSeconds < n.LastModified { + v.lastModifiedTsSeconds = n.LastModified + } + return +} + +func (v *Volume) syncDelete(n *needle.Needle) (Size, error) { + // glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + actualSize := needle.GetActualSize(0, v.Version()) + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) { + err := fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + return 0, err + } + + return v.doDeleteRequest(n) +} + +func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) { + // todo: delete info is always appended no fsync, it may need fsync in future + fsync := false + + if !fsync { + return v.syncDelete(n) + } else { + asyncRequest := needle.NewAsyncRequest(n, false) + asyncRequest.ActualSize = needle.GetActualSize(0, v.Version()) + + v.asyncRequestAppend(asyncRequest) + _, size, _, err := asyncRequest.WaitComplete() + + return Size(size), err + } +} + +func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) { + glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) + nv, ok := v.nm.Get(n.Id) + // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size) + if ok && nv.Size.IsValid() { + size := nv.Size + n.Data = nil + n.AppendAtNs = uint64(time.Now().UnixNano()) + offset, _, _, err := n.Append(v.DataBackend, v.Version()) + v.checkReadWriteError(err) + if err != nil { + return size, err + } + v.lastAppendAtNs = n.AppendAtNs + if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { + return size, err + } + return size, err + } + return 0, nil +} + +func (v *Volume) startWorker() { + go func() { + chanClosed := false + for { + // chan closed. go thread will exit + if chanClosed { + break + } + currentRequests := make([]*needle.AsyncRequest, 0, 128) + currentBytesToWrite := int64(0) + for { + request, ok := <-v.asyncRequestsChan + // volume may be closed + if !ok { + chanClosed = true + break + } + if MaxPossibleVolumeSize < v.ContentSize()+uint64(currentBytesToWrite+request.ActualSize) { + request.Complete(0, 0, false, + fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize())) + break + } + currentRequests = append(currentRequests, request) + currentBytesToWrite += request.ActualSize + // submit at most 4M bytes or 128 requests at one time to decrease request delay. + // it also need to break if there is no data in channel to avoid io hang. + if currentBytesToWrite >= 4*1024*1024 || len(currentRequests) >= 128 || len(v.asyncRequestsChan) == 0 { + break + } + } + if len(currentRequests) == 0 { + continue + } + v.dataFileAccessLock.Lock() + end, _, e := v.DataBackend.GetStat() + if e != nil { + for i := 0; i < len(currentRequests); i++ { + currentRequests[i].Complete(0, 0, false, + fmt.Errorf("cannot read current volume position: %v", e)) + } + v.dataFileAccessLock.Unlock() + continue + } + + for i := 0; i < len(currentRequests); i++ { + if currentRequests[i].IsWriteRequest { + offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N) + currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err) + } else { + size, err := v.doDeleteRequest(currentRequests[i].N) + currentRequests[i].UpdateResult(0, uint64(size), false, err) + } + } + + // if sync error, data is not reliable, we should mark the completed request as fail and rollback + if err := v.DataBackend.Sync(); err != nil { + // todo: this may generate dirty data or cause data inconsistent, may be weed need to panic? + if te := v.DataBackend.Truncate(end); te != nil { + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", v.DataBackend.Name(), end, te) + } + for i := 0; i < len(currentRequests); i++ { + if currentRequests[i].IsSucceed() { + currentRequests[i].UpdateResult(0, 0, false, err) + } + } + } + + for i := 0; i < len(currentRequests); i++ { + currentRequests[i].Submit() + } + v.dataFileAccessLock.Unlock() + } + }() +} + +func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error { + + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() + + if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) { + return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize()) + } + + appendAtNs := uint64(time.Now().UnixNano()) + offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version()) + + v.checkReadWriteError(err) + if err != nil { + return err + } + v.lastAppendAtNs = appendAtNs + + // add to needle map + if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil { + glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err) + } + + return err +} |
