diff options
Diffstat (limited to 'weed/storage')
51 files changed, 1963 insertions, 893 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go index 3c297f20b..6941ca5a1 100644 --- a/weed/storage/backend/backend.go +++ b/weed/storage/backend/backend.go @@ -2,18 +2,134 @@ package backend import ( "io" + "os" + "strings" "time" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/spf13/viper" ) -type DataStorageBackend interface { +type BackendStorageFile interface { io.ReaderAt io.WriterAt Truncate(off int64) error io.Closer GetStat() (datSize int64, modTime time.Time, err error) - String() string + Name() string +} + +type BackendStorage interface { + ToProperties() map[string]string + NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) BackendStorageFile + CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) + DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) + DeleteFile(key string) (err error) +} + +type StringProperties interface { + GetString(key string) string +} +type StorageType string +type BackendStorageFactory interface { + StorageType() StorageType + BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error) } var ( - StorageBackends []DataStorageBackend + BackendStorageFactories = make(map[StorageType]BackendStorageFactory) + BackendStorages = make(map[string]BackendStorage) ) + +// used by master to load remote storage configurations +func LoadConfiguration(config *viper.Viper) { + + StorageBackendPrefix := "storage.backend" + + for backendTypeName := range config.GetStringMap(StorageBackendPrefix) { + backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)] + if !found { + glog.Fatalf("backend storage type %s not found", backendTypeName) + } + for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) { + if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") { + continue + } + backendStorage, buildErr := backendStorageFactory.BuildStorage(config, + StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId) + if buildErr != nil { + glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId) + } + BackendStorages[backendTypeName+"."+backendStorageId] = backendStorage + if backendStorageId == "default" { + BackendStorages[backendTypeName] = backendStorage + } + } + } + +} + +// used by volume server to receive remote storage configurations from master +func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) { + + for _, storageBackend := range storageBackends { + backendStorageFactory, found := BackendStorageFactories[StorageType(storageBackend.Type)] + if !found { + glog.Warningf("storage type %s not found", storageBackend.Type) + continue + } + backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id) + if buildErr != nil { + glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id) + } + BackendStorages[storageBackend.Type+"."+storageBackend.Id] = backendStorage + if storageBackend.Id == "default" { + BackendStorages[storageBackend.Type] = backendStorage + } + } +} + +type Properties struct { + m map[string]string +} + +func newProperties(m map[string]string) *Properties { + return &Properties{m: m} +} + +func (p *Properties) GetString(key string) string { + if v, found := p.m[key]; found { + return v + } + return "" +} + +func ToPbStorageBackends() (backends []*master_pb.StorageBackend) { + for sName, s := range BackendStorages { + sType, sId := BackendNameToTypeId(sName) + if sType == "" { + continue + } + backends = append(backends, &master_pb.StorageBackend{ + Type: sType, + Id: sId, + Properties: s.ToProperties(), + }) + } + return +} + +func BackendNameToTypeId(backendName string) (backendType, backendId string) { + parts := strings.Split(backendName, ".") + if len(parts) == 1 { + return backendName, "default" + } + if len(parts) != 2 { + return + } + + backendType, backendId = parts[0], parts[1] + return +} diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go index 7f2b39d15..c4b3caffb 100644 --- a/weed/storage/backend/disk_file.go +++ b/weed/storage/backend/disk_file.go @@ -6,7 +6,7 @@ import ( ) var ( - _ DataStorageBackend = &DiskFile{} + _ BackendStorageFile = &DiskFile{} ) type DiskFile struct { @@ -45,6 +45,6 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) { return 0, time.Time{}, err } -func (df *DiskFile) String() string { +func (df *DiskFile) Name() string { return df.fullFilePath } diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go index bac105022..03e7308d0 100644 --- a/weed/storage/backend/memory_map/memory_map_backend.go +++ b/weed/storage/backend/memory_map/memory_map_backend.go @@ -8,7 +8,7 @@ import ( ) var ( - _ backend.DataStorageBackend = &MemoryMappedFile{} + _ backend.BackendStorageFile = &MemoryMappedFile{} ) type MemoryMappedFile struct { @@ -55,6 +55,6 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er return 0, time.Time{}, err } -func (mmf *MemoryMappedFile) String() string { +func (mmf *MemoryMappedFile) Name() string { return mmf.mm.File.Name() } diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go index 0ff7eca21..8d71861c2 100644 --- a/weed/storage/backend/s3_backend/s3_backend.go +++ b/weed/storage/backend/s3_backend/s3_backend.go @@ -2,119 +2,176 @@ package s3_backend import ( "fmt" + "io" + "os" "strings" "time" "github.com/aws/aws-sdk-go/service/s3" "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/google/uuid" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" -) - -var ( - _ backend.DataStorageBackend = &S3Backend{} ) func init() { - backend.StorageBackends = append(backend.StorageBackends, &S3Backend{}) + backend.BackendStorageFactories["s3"] = &S3BackendFactory{} } -type S3Backend struct { - conn s3iface.S3API - region string - bucket string - dir string - vid needle.VolumeId - key string +type S3BackendFactory struct { } -func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) { - bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) - getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, - Range: &bytesRange, - }) +func (factory *S3BackendFactory) StorageType() backend.StorageType { + return backend.StorageType("s3") +} +func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) { + return newS3BackendStorage(configuration, configPrefix, id) +} - if getObjectErr != nil { - return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr) +type S3BackendStorage struct { + id string + aws_access_key_id string + aws_secret_access_key string + region string + bucket string + conn s3iface.S3API +} + +func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) { + s = &S3BackendStorage{} + s.id = id + s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id") + s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key") + s.region = configuration.GetString(configPrefix + "region") + s.bucket = configuration.GetString(configPrefix + "bucket") + s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region) + + glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket) + return +} + +func (s *S3BackendStorage) ToProperties() map[string]string { + m := make(map[string]string) + m["aws_access_key_id"] = s.aws_access_key_id + m["aws_secret_access_key"] = s.aws_secret_access_key + m["region"] = s.region + m["bucket"] = s.bucket + return m +} + +func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile { + if strings.HasPrefix(key, "/") { + key = key[1:] } - defer getObjectOutput.Body.Close() - return getObjectOutput.Body.Read(p) + f := &S3BackendStorageFile{ + backendStorage: s, + key: key, + tierInfo: tierInfo, + } + return f } -func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) { - panic("implement me") +func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) { + randomUuid, _ := uuid.NewRandom() + key = randomUuid.String() + + glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key) + + size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn) + + return } -func (s3backend S3Backend) Truncate(off int64) error { - panic("implement me") +func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) { + + glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key) + + size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn) + + return } -func (s3backend S3Backend) Close() error { - return nil +func (s *S3BackendStorage) DeleteFile(key string) (err error) { + + glog.V(1).Infof("delete dat file %s from remote", key) + + err = deleteFromS3(s.conn, s.bucket, key) + + return } -func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) { +type S3BackendStorageFile struct { + backendStorage *S3BackendStorage + key string + tierInfo *volume_server_pb.VolumeInfo +} - headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{ - Bucket: &s3backend.bucket, - Key: &s3backend.key, +func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) { + + bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1) + + // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + + getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{ + Bucket: &s3backendStorageFile.backendStorage.bucket, + Key: &s3backendStorageFile.key, + Range: &bytesRange, }) - if headObjectErr != nil { - return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr) + if getObjectErr != nil { + return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr) + } + defer getObjectOutput.Body.Close() + + glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange) + glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength) + + for { + if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) { + p = p[n:] + } else { + break + } } - datSize = int64(*headObjectOutput.ContentLength) - modTime = *headObjectOutput.LastModified + if err == io.EOF { + err = nil + } return } -func (s3backend S3Backend) String() string { - return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key) +func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) { + panic("not implemented") } -func (s3backend *S3Backend) GetName() string { - return "s3" +func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error { + panic("not implemented") } -func (s3backend *S3Backend) GetSinkToDirectory() string { - return s3backend.dir +func (s3backendStorageFile S3BackendStorageFile) Close() error { + return nil } -func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error { - glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region")) - glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket")) - glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory")) +func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) { - return s3backend.initialize( - configuration.GetString("aws_access_key_id"), - configuration.GetString("aws_secret_access_key"), - configuration.GetString("region"), - configuration.GetString("bucket"), - configuration.GetString("directory"), - vid, - ) -} - -func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string, - vid needle.VolumeId) (err error) { - s3backend.region = region - s3backend.bucket = bucket - s3backend.dir = dir - s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region) + files := s3backendStorageFile.tierInfo.GetFiles() - s3backend.vid = vid - s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid) - if strings.HasPrefix(s3backend.key, "/") { - s3backend.key = s3backend.key[1:] + if len(files) == 0 { + err = fmt.Errorf("remote file info not found") + return } - return err + datSize = int64(files[0].FileSize) + modTime = time.Unix(int64(files[0].ModifiedTime), 0) + + return +} + +func (s3backendStorageFile S3BackendStorageFile) Name() string { + return s3backendStorageFile.key } diff --git a/weed/storage/backend/s3_backend/s3_download.go b/weed/storage/backend/s3_backend/s3_download.go new file mode 100644 index 000000000..dbc28446a --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_download.go @@ -0,0 +1,98 @@ +package s3_backend + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string, sourceKey string, + fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + + fileSize, err = getFileSize(sess, sourceBucket, sourceKey) + if err != nil { + return + } + + //open the file + f, err := os.OpenFile(destFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return 0, fmt.Errorf("failed to open file %q, %v", destFileName, err) + } + defer f.Close() + + // Create a downloader with the session and custom options + downloader := s3manager.NewDownloaderWithClient(sess, func(u *s3manager.Downloader) { + u.PartSize = int64(64 * 1024 * 1024) + u.Concurrency = 5 + }) + + fileWriter := &s3DownloadProgressedWriter{ + fp: f, + size: fileSize, + written: 0, + fn: fn, + } + + // Download the file from S3. + fileSize, err = downloader.Download(fileWriter, &s3.GetObjectInput{ + Bucket: aws.String(sourceBucket), + Key: aws.String(sourceKey), + }) + if err != nil { + return fileSize, fmt.Errorf("failed to download file %s: %v", destFileName, err) + } + + glog.V(1).Infof("downloaded file %s\n", destFileName) + + return +} + +// adapted from https://github.com/aws/aws-sdk-go/pull/1868 +// and https://petersouter.xyz/s3-download-progress-bar-in-golang/ +type s3DownloadProgressedWriter struct { + fp *os.File + size int64 + written int64 + fn func(progressed int64, percentage float32) error +} + +func (w *s3DownloadProgressedWriter) WriteAt(p []byte, off int64) (int, error) { + n, err := w.fp.WriteAt(p, off) + if err != nil { + return n, err + } + + // Got the length have read( or means has uploaded), and you can construct your message + atomic.AddInt64(&w.written, int64(n)) + + if w.fn != nil { + written := w.written + if err := w.fn(written, float32(written*100)/float32(w.size)); err != nil { + return n, err + } + } + + return n, err +} + +func getFileSize(svc s3iface.S3API, bucket string, key string) (filesize int64, error error) { + params := &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + } + + resp, err := svc.HeadObject(params) + if err != nil { + return 0, err + } + + return *resp.ContentLength, nil +} diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go index cd7b7ad47..5fdbcb66b 100644 --- a/weed/storage/backend/s3_backend/s3_sessions.go +++ b/weed/storage/backend/s3_backend/s3_sessions.go @@ -52,3 +52,11 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S return t, nil } + +func deleteFromS3(sess s3iface.S3API, sourceBucket string, sourceKey string) (err error) { + _, err = sess.DeleteObject(&s3.DeleteObjectInput{ + Bucket: aws.String(sourceBucket), + Key: aws.String(sourceKey), + }) + return err +} diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go new file mode 100644 index 000000000..500a85590 --- /dev/null +++ b/weed/storage/backend/s3_backend/s3_upload.go @@ -0,0 +1,114 @@ +package s3_backend + +import ( + "fmt" + "os" + "sync/atomic" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + + "github.com/chrislusf/seaweedfs/weed/glog" +) + +func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string, + attributes map[string]string, + fn func(progressed int64, percentage float32) error) (fileSize int64, err error) { + + //open the file + f, err := os.Open(filename) + if err != nil { + return 0, fmt.Errorf("failed to open file %q, %v", filename, err) + } + defer f.Close() + + info, err := f.Stat() + if err != nil { + return 0, fmt.Errorf("failed to stat file %q, %v", filename, err) + } + + fileSize = info.Size() + + partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB + for partSize*1000 < fileSize { + partSize *= 4 + } + + // Create an uploader with the session and custom options + uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) { + u.PartSize = partSize + u.Concurrency = 5 + }) + + fileReader := &s3UploadProgressedReader{ + fp: f, + size: fileSize, + read: -fileSize, + fn: fn, + } + + // process tagging + tags := "" + for k, v := range attributes { + if len(tags) > 0 { + tags = tags + "&" + } + tags = tags + k + "=" + v + } + + // Upload the file to S3. + var result *s3manager.UploadOutput + result, err = uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(destBucket), + Key: aws.String(destKey), + Body: fileReader, + ACL: aws.String("private"), + ServerSideEncryption: aws.String("AES256"), + StorageClass: aws.String("STANDARD_IA"), + Tagging: aws.String(tags), + }) + + //in case it fails to upload + if err != nil { + return 0, fmt.Errorf("failed to upload file %s: %v", filename, err) + } + glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location) + + return +} + +// adapted from https://github.com/aws/aws-sdk-go/pull/1868 +type s3UploadProgressedReader struct { + fp *os.File + size int64 + read int64 + fn func(progressed int64, percentage float32) error +} + +func (r *s3UploadProgressedReader) Read(p []byte) (int, error) { + return r.fp.Read(p) +} + +func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) { + n, err := r.fp.ReadAt(p, off) + if err != nil { + return n, err + } + + // Got the length have read( or means has uploaded), and you can construct your message + atomic.AddInt64(&r.read, int64(n)) + + if r.fn != nil { + read := r.read + if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil { + return n, err + } + } + + return n, err +} + +func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) { + return r.fp.Seek(offset, whence) +} diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index c7faa57a6..f15303282 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -1,13 +1,12 @@ package storage import ( + "fmt" "io/ioutil" "os" "strings" "sync" - "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" @@ -17,7 +16,7 @@ type DiskLocation struct { Directory string MaxVolumeCount int volumes map[needle.VolumeId]*Volume - sync.RWMutex + volumesLock sync.RWMutex // erasure coding ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume @@ -33,8 +32,8 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation { func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) { name := dir.Name() - if !dir.IsDir() && strings.HasSuffix(name, ".dat") { - base := name[:len(name)-len(".dat")] + if !dir.IsDir() && strings.HasSuffix(name, ".idx") { + base := name[:len(name)-len(".idx")] collection, volumeId, err := parseCollectionVolumeId(base) return volumeId, collection, err } @@ -51,30 +50,39 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI return collection, vol, err } -func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) { +func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool { name := fileInfo.Name() - if !fileInfo.IsDir() && strings.HasSuffix(name, ".dat") { + if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") { vid, collection, err := l.volumeIdFromPath(fileInfo) - if err == nil { - l.RLock() - _, found := l.volumes[vid] - l.RUnlock() - if !found { - if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil { - l.Lock() - l.volumes[vid] = v - l.Unlock() - size, _, _ := v.FileStat() - glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", - l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) - // println("volume", vid, "last append at", v.lastAppendAtNs) - } else { - glog.V(0).Infof("new volume %s error %s", name, e) - } + if err != nil { + glog.Warningf("get volume id failed, %s, err : %s", name, err) + return false + } - } + // void loading one volume more than once + l.volumesLock.RLock() + _, found := l.volumes[vid] + l.volumesLock.RUnlock() + if found { + glog.V(1).Infof("loaded volume, %v", vid) + return true + } + + v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0) + if e != nil { + glog.V(0).Infof("new volume %s error %s", name, e) + return false } + + l.volumesLock.Lock() + l.volumes[vid] = v + l.volumesLock.Unlock() + size, _, _ := v.FileStat() + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", + l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String()) + return true } + return false } func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) { @@ -95,7 +103,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con go func() { defer wg.Done() for dir := range task_queue { - l.loadExistingVolume(dir, needleMapKind) + _ = l.loadExistingVolume(dir, needleMapKind) } }() } @@ -115,29 +123,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) { func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) { - l.Lock() - for k, v := range l.volumes { - if v.Collection == collection { - e = l.deleteVolumeById(k) - if e != nil { - l.Unlock() - return - } - } - } - l.Unlock() + l.volumesLock.Lock() + delVolsMap := l.unmountVolumeByCollection(collection) + l.volumesLock.Unlock() l.ecVolumesLock.Lock() - for k, v := range l.ecVolumes { - if v.Collection == collection { - e = l.deleteEcVolumeById(k) - if e != nil { - l.ecVolumesLock.Unlock() - return + delEcVolsMap := l.unmountEcVolumeByCollection(collection) + l.ecVolumesLock.Unlock() + + errChain := make(chan error, 2) + var wg sync.WaitGroup + wg.Add(2) + go func() { + for _, v := range delVolsMap { + if err := v.Destroy(); err != nil { + errChain <- err } } + wg.Done() + }() + + go func() { + for _, v := range delEcVolsMap { + v.Destroy() + } + wg.Done() + }() + + go func() { + wg.Wait() + close(errChain) + }() + + errBuilder := strings.Builder{} + for err := range errChain { + errBuilder.WriteString(err.Error()) + errBuilder.WriteString("; ") + } + if errBuilder.Len() > 0 { + e = fmt.Errorf(errBuilder.String()) } - l.ecVolumesLock.Unlock() return } @@ -156,22 +181,15 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) { } func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool { - if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { - for _, fileInfo := range fileInfos { - volId, _, err := l.volumeIdFromPath(fileInfo) - if vid == volId && err == nil { - l.loadExistingVolume(fileInfo, needleMapKind) - return true - } - } + if fileInfo, found := l.LocateVolume(vid); found { + return l.loadExistingVolume(fileInfo, needleMapKind) } - return false } func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { - l.Lock() - defer l.Unlock() + l.volumesLock.Lock() + defer l.volumesLock.Unlock() _, ok := l.volumes[vid] if !ok { @@ -181,8 +199,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error { } func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { - l.Lock() - defer l.Unlock() + l.volumesLock.Lock() + defer l.volumesLock.Unlock() v, ok := l.volumes[vid] if !ok { @@ -193,34 +211,48 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error { return nil } +func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume { + deltaVols := make(map[needle.VolumeId]*Volume, 0) + for k, v := range l.volumes { + if v.Collection == collectionName && !v.isCompacting { + deltaVols[k] = v + } + } + + for k := range deltaVols { + delete(l.volumes, k) + } + return deltaVols +} + func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) { - l.Lock() - defer l.Unlock() + l.volumesLock.Lock() + defer l.volumesLock.Unlock() l.volumes[vid] = volume } func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) { - l.RLock() - defer l.RUnlock() + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() v, ok := l.volumes[vid] return v, ok } func (l *DiskLocation) VolumesLen() int { - l.RLock() - defer l.RUnlock() + l.volumesLock.RLock() + defer l.volumesLock.RUnlock() return len(l.volumes) } func (l *DiskLocation) Close() { - l.Lock() + l.volumesLock.Lock() for _, v := range l.volumes { v.Close() } - l.Unlock() + l.volumesLock.Unlock() l.ecVolumesLock.Lock() for _, ecVolume := range l.ecVolumes { @@ -230,3 +262,16 @@ func (l *DiskLocation) Close() { return } + +func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) { + if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil { + for _, fileInfo := range fileInfos { + volId, _, err := l.volumeIdFromPath(fileInfo) + if vid == volId && err == nil { + return fileInfo, true + } + } + } + + return nil, false +} diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go index ba0824c6d..f6c44e966 100644 --- a/weed/storage/disk_location_ec.go +++ b/weed/storage/disk_location_ec.go @@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) { delete(l.ecVolumes, vid) return } + +func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume { + deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0) + for k, v := range l.ecVolumes { + if v.Collection == collectionName { + deltaVols[k] = v + } + } + + for k, _ := range deltaVols { + delete(l.ecVolumes, k) + } + return deltaVols +} diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go new file mode 100644 index 000000000..ae77cee3f --- /dev/null +++ b/weed/storage/erasure_coding/ec_decoder.go @@ -0,0 +1,198 @@ +package erasure_coding + +import ( + "fmt" + "io" + "os" + + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +// write .idx file from .ecx and .ecj files +func WriteIdxFileFromEcIndex(baseFileName string) (err error) { + + ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecxFile.Close() + + idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr) + } + defer idxFile.Close() + + io.Copy(idxFile, ecxFile) + + err = iterateEcjFile(baseFileName, func(key types.NeedleId) error { + + bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize) + idxFile.Write(bytes) + + return nil + }) + + return err +} + +// FindDatFileSize calculate .dat file size from max offset entry +// there may be extra deletions after that entry +// but they are deletions anyway +func FindDatFileSize(baseFileName string) (datSize int64, err error) { + + version, err := readEcVolumeVersion(baseFileName) + if err != nil { + return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err) + } + + err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error { + + if size == types.TombstoneFileSize { + return nil + } + + entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version) + if datSize < entryStopOffset { + datSize = entryStopOffset + } + + return nil + }) + + return +} + +func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) { + + // find volume version + datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644) + if err != nil { + return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err) + } + datBackend := backend.NewDiskFile(datFile) + + superBlock, err := super_block.ReadSuperBlock(datBackend) + datBackend.Close() + if err != nil { + return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err) + } + + return superBlock.Version, nil + +} + +func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error { + ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecxFile.Close() + + buf := make([]byte, types.NeedleMapEntrySize) + for { + n, err := ecxFile.Read(buf) + if n != types.NeedleMapEntrySize { + if err == io.EOF { + return nil + } + return err + } + key, offset, size := idx.IdxFileEntry(buf) + if processNeedleFn != nil { + err = processNeedleFn(key, offset, size) + } + if err != nil { + if err != io.EOF { + return err + } + return nil + } + } + +} + +func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error { + ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644) + if openErr != nil { + return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr) + } + defer ecjFile.Close() + + buf := make([]byte, types.NeedleIdSize) + for { + n, err := ecjFile.Read(buf) + if n != types.NeedleIdSize { + if err == io.EOF { + return nil + } + return err + } + if processNeedleFn != nil { + err = processNeedleFn(types.BytesToNeedleId(buf)) + } + if err != nil { + if err == io.EOF { + return nil + } + return err + } + } + +} + +// WriteDatFile generates .dat from from .ec00 ~ .ec09 files +func WriteDatFile(baseFileName string, datFileSize int64) error { + + datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if openErr != nil { + return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr) + } + defer datFile.Close() + + inputFiles := make([]*os.File, DataShardsCount) + + for shardId := 0; shardId < DataShardsCount; shardId++ { + shardFileName := baseFileName + ToExt(shardId) + inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0) + if openErr != nil { + return openErr + } + defer inputFiles[shardId].Close() + } + + for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize { + for shardId := 0; shardId < DataShardsCount; shardId++ { + w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize) + if w != ErasureCodingLargeBlockSize { + return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err) + } + datFileSize -= ErasureCodingLargeBlockSize + } + } + + for datFileSize > 0 { + for shardId := 0; shardId < DataShardsCount; shardId++ { + toRead := min(datFileSize, ErasureCodingSmallBlockSize) + w, err := io.CopyN(datFile, inputFiles[shardId], toRead) + if w != toRead { + return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err) + } + datFileSize -= toRead + } + } + + return nil +} + +func min(x, y int64) int64 { + if x > y { + return y + } + return x +} diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go index 97010a1ed..97c3ccbd9 100644 --- a/weed/storage/erasure_coding/ec_encoder.go +++ b/weed/storage/erasure_coding/ec_encoder.go @@ -5,12 +5,13 @@ import ( "io" "os" + "github.com/klauspost/reedsolomon" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/klauspost/reedsolomon" ) const ( @@ -21,35 +22,38 @@ const ( ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB ) -// WriteSortedEcxFile generates .ecx file from existing .idx file +// WriteSortedFileFromIdx generates .ecx file from existing .idx file // all keys are sorted in ascending order -func WriteSortedEcxFile(baseFileName string) (e error) { +func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) { - cm, err := readCompactMap(baseFileName) + nm, err := readNeedleMap(baseFileName) + if nm != nil { + defer nm.Close() + } if err != nil { - return fmt.Errorf("readCompactMap: %v", err) + return fmt.Errorf("readNeedleMap: %v", err) } - ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) + ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) if err != nil { return fmt.Errorf("failed to open ecx file: %v", err) } defer ecxFile.Close() - err = cm.AscendingVisit(func(value needle_map.NeedleValue) error { + err = nm.AscendingVisit(func(value needle_map.NeedleValue) error { bytes := value.ToBytes() _, writeErr := ecxFile.Write(bytes) return writeErr }) if err != nil { - return fmt.Errorf("failed to visit ecx file: %v", err) + return fmt.Errorf("failed to visit idx file: %v", err) } return nil } -// WriteEcFiles generates .ec01 ~ .ec14 files +// WriteEcFiles generates .ec00 ~ .ec13 files func WriteEcFiles(baseFileName string) error { return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize) } @@ -195,7 +199,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi } buffers := make([][]byte, TotalShardsCount) - for i, _ := range buffers { + for i := range buffers { buffers[i] = make([]byte, bufferSize) } @@ -232,7 +236,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o } buffers := make([][]byte, TotalShardsCount) - for i, _ := range buffers { + for i := range buffers { if shardHasData[i] { buffers[i] = make([]byte, ErasureCodingSmallBlockSize) } @@ -280,14 +284,14 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o } -func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) { +func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) { indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644) if err != nil { return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err) } defer indexFile.Close() - cm := needle_map.NewCompactMap() + cm := needle_map.NewMemDb() err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { if !offset.IsZero() && size != types.TombstoneFileSize { cm.Set(key, offset, size) diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go index 57df09525..92b83cdc8 100644 --- a/weed/storage/erasure_coding/ec_test.go +++ b/weed/storage/erasure_coding/ec_test.go @@ -7,9 +7,10 @@ import ( "os" "testing" + "github.com/klauspost/reedsolomon" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/klauspost/reedsolomon" ) const ( @@ -26,14 +27,14 @@ func TestEncodingDecoding(t *testing.T) { t.Logf("generateEcFiles: %v", err) } - err = WriteSortedEcxFile(baseFileName) + err = WriteSortedFileFromIdx(baseFileName, ".ecx") if err != nil { - t.Logf("WriteSortedEcxFile: %v", err) + t.Logf("WriteSortedFileFromIdx: %v", err) } err = validateFiles(baseFileName) if err != nil { - t.Logf("WriteSortedEcxFile: %v", err) + t.Logf("WriteSortedFileFromIdx: %v", err) } removeGeneratedFiles(baseFileName) @@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) { } func validateFiles(baseFileName string) error { - cm, err := readCompactMap(baseFileName) + nm, err := readNeedleMap(baseFileName) + defer nm.Close() if err != nil { - return fmt.Errorf("readCompactMap: %v", err) + return fmt.Errorf("readNeedleMap: %v", err) } datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0) @@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error { ecFiles, err := openEcFiles(baseFileName, true) defer closeEcFiles(ecFiles) - err = cm.AscendingVisit(func(value needle_map.NeedleValue) error { + err = nm.AscendingVisit(func(value needle_map.NeedleValue) error { return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size) }) if err != nil { diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go index bcae164ca..3d9aa2cff 100644 --- a/weed/storage/erasure_coding/ec_volume.go +++ b/weed/storage/erasure_coding/ec_volume.go @@ -9,7 +9,9 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" @@ -56,6 +58,14 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err) } + // read volume info + ev.Version = needle.Version3 + if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found { + ev.Version = needle.Version(volumeInfo.Version) + } else { + pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)}) + } + ev.ShardLocations = make(map[ShardId][]string) return @@ -126,6 +136,7 @@ func (ev *EcVolume) Destroy() { } os.Remove(ev.FileName() + ".ecx") os.Remove(ev.FileName() + ".ecj") + os.Remove(ev.FileName() + ".vif") } func (ev *EcVolume) FileName() string { @@ -186,10 +197,10 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle. } func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) { - return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil) + return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil) } -func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { +func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) { var key types.NeedleId buf := make([]byte, types.NeedleMapEntrySize) l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go index 04102ec9e..822a9e923 100644 --- a/weed/storage/erasure_coding/ec_volume_delete.go +++ b/weed/storage/erasure_coding/ec_volume_delete.go @@ -10,15 +10,15 @@ import ( ) var ( - markNeedleDeleted = func(file *os.File, offset int64) error { + MarkNeedleDeleted = func(file *os.File, offset int64) error { b := make([]byte, types.SizeSize) util.Uint32toBytes(b, types.TombstoneFileSize) n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize) if err != nil { - return fmt.Errorf("ecx write error: %v", err) + return fmt.Errorf("sorted needle write error: %v", err) } if n != types.SizeSize { - return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize) + return fmt.Errorf("sorted needle written %d bytes, expecting %d", n, types.SizeSize) } return nil } @@ -26,7 +26,7 @@ var ( func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) { - _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted) + _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted) if err != nil { if err == NotFoundError { @@ -81,7 +81,7 @@ func RebuildEcxFile(baseFileName string) error { needleId := types.BytesToNeedleId(buf) - _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted) + _, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted) if err != nil && err != NotFoundError { ecxFile.Close() diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go index c9e85c662..8ff65bb0f 100644 --- a/weed/storage/erasure_coding/ec_volume_info.go +++ b/weed/storage/erasure_coding/ec_volume_info.go @@ -81,6 +81,15 @@ func (b ShardBits) ShardIds() (ret []ShardId) { return } +func (b ShardBits) ToUint32Slice() (ret []uint32) { + for i := uint32(0); i < TotalShardsCount; i++ { + if b.HasShardId(ShardId(i)) { + ret = append(ret, i) + } + } + return +} + func (b ShardBits) ShardIdCount() (count int) { for count = 0; b > 0; count++ { b &= b - 1 @@ -95,3 +104,10 @@ func (b ShardBits) Minus(other ShardBits) ShardBits { func (b ShardBits) Plus(other ShardBits) ShardBits { return b | other } + +func (b ShardBits) MinusParityShards() ShardBits { + for i := DataShardsCount; i < TotalShardsCount; i++ { + b = b.RemoveShardId(ShardId(i)) + } + return b +} diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go index 00ea1db69..6fd910bb7 100644 --- a/weed/storage/needle/crc.go +++ b/weed/storage/needle/crc.go @@ -1,11 +1,11 @@ package needle import ( - "crypto/md5" "fmt" - "github.com/chrislusf/seaweedfs/weed/util" "github.com/klauspost/crc32" + + "github.com/chrislusf/seaweedfs/weed/util" ) var table = crc32.MakeTable(crc32.Castagnoli) @@ -29,13 +29,3 @@ func (n *Needle) Etag() string { util.Uint32toBytes(bits, uint32(n.Checksum)) return fmt.Sprintf("%x", bits) } - -func (n *Needle) MD5() string { - - hash := md5.New() - - hash.Write(n.Data) - - return fmt.Sprintf("%x", hash.Sum(nil)) - -} diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go index 2f03ba87b..d3969e868 100644 --- a/weed/storage/needle/needle.go +++ b/weed/storage/needle/needle.go @@ -8,8 +8,6 @@ import ( "strings" "time" - "io/ioutil" - "github.com/chrislusf/seaweedfs/weed/images" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -50,53 +48,28 @@ func (n *Needle) String() (str string) { return } -func ParseUpload(r *http.Request) ( - fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int, - modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) { - pairMap = make(map[string]string) - for k, v := range r.Header { - if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { - pairMap[k] = v[0] - } - } - - if r.Method == "POST" { - fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r) - } else { - isGzipped = false - mimeType = r.Header.Get("Content-Type") - fileName = "" - data, e = ioutil.ReadAll(r.Body) - originalDataSize = len(data) - } - if e != nil { - return - } - - modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) - ttl, _ = ReadTTL(r.FormValue("ttl")) - - return -} -func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) { - var pairMap map[string]string - fname, mimeType, isGzipped, isChunkedFile := "", "", false, false +func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) { n = new(Needle) - fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r) + pu, e := ParseUpload(r, sizeLimit) if e != nil { return } - if len(fname) < 256 { - n.Name = []byte(fname) + n.Data = pu.Data + originalSize = pu.OriginalDataSize + n.LastModified = pu.ModifiedTime + n.Ttl = pu.Ttl + + if len(pu.FileName) < 256 { + n.Name = []byte(pu.FileName) n.SetHasName() } - if len(mimeType) < 256 { - n.Mime = []byte(mimeType) + if len(pu.MimeType) < 256 { + n.Mime = []byte(pu.MimeType) n.SetHasMime() } - if len(pairMap) != 0 { + if len(pu.PairMap) != 0 { trimmedPairMap := make(map[string]string) - for k, v := range pairMap { + for k, v := range pu.PairMap { trimmedPairMap[k[len(PairNamePrefix):]] = v } @@ -107,7 +80,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle n.SetHasPairs() } } - if isGzipped { + if pu.IsGzipped { n.SetGzipped() } if n.LastModified == 0 { @@ -118,13 +91,13 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle n.SetHasTtl() } - if isChunkedFile { + if pu.IsChunkedFile { n.SetIsChunkManifest() } if fixJpgOrientation { - loweredName := strings.ToLower(fname) - if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { + loweredName := strings.ToLower(pu.FileName) + if pu.MimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") { n.Data = images.FixJpgOrientation(n.Data) } } diff --git a/weed/storage/needle/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go deleted file mode 100644 index 8be1a1da4..000000000 --- a/weed/storage/needle/needle_parse_multipart.go +++ /dev/null @@ -1,109 +0,0 @@ -package needle - -import ( - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/util" - - "io" - "io/ioutil" - "mime" - "net/http" - "path" - "strconv" - "strings" -) - -func parseMultipart(r *http.Request) ( - fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) { - defer func() { - if e != nil && r.Body != nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - } - }() - form, fe := r.MultipartReader() - if fe != nil { - glog.V(0).Infoln("MultipartReader [ERROR]", fe) - e = fe - return - } - - //first multi-part item - part, fe := form.NextPart() - if fe != nil { - glog.V(0).Infoln("Reading Multi part [ERROR]", fe) - e = fe - return - } - - fileName = part.FileName() - if fileName != "" { - fileName = path.Base(fileName) - } - - data, e = ioutil.ReadAll(part) - if e != nil { - glog.V(0).Infoln("Reading Content [ERROR]", e) - return - } - - //if the filename is empty string, do a search on the other multi-part items - for fileName == "" { - part2, fe := form.NextPart() - if fe != nil { - break // no more or on error, just safely break - } - - fName := part2.FileName() - - //found the first <file type> multi-part has filename - if fName != "" { - data2, fe2 := ioutil.ReadAll(part2) - if fe2 != nil { - glog.V(0).Infoln("Reading Content [ERROR]", fe2) - e = fe2 - return - } - - //update - data = data2 - fileName = path.Base(fName) - break - } - } - - originalDataSize = len(data) - - isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) - - if !isChunkedFile { - - dotIndex := strings.LastIndex(fileName, ".") - ext, mtype := "", "" - if dotIndex > 0 { - ext = strings.ToLower(fileName[dotIndex:]) - mtype = mime.TypeByExtension(ext) - } - contentType := part.Header.Get("Content-Type") - if contentType != "" && mtype != contentType { - mimeType = contentType //only return mime type if not deductable - mtype = contentType - } - - if part.Header.Get("Content-Encoding") == "gzip" { - if unzipped, e := util.UnGzipData(data); e == nil { - originalDataSize = len(unzipped) - } - isGzipped = true - } else if util.IsGzippable(ext, mtype, data) { - if compressedData, err := util.GzipData(data); err == nil { - if len(data) > len(compressedData) { - data = compressedData - isGzipped = true - } - } - } - } - - return -} diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go new file mode 100644 index 000000000..85526aaa8 --- /dev/null +++ b/weed/storage/needle/needle_parse_upload.go @@ -0,0 +1,166 @@ +package needle + +import ( + "fmt" + "io" + "io/ioutil" + "mime" + "net/http" + "path" + "strconv" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/util" +) + +type ParsedUpload struct { + FileName string + Data []byte + MimeType string + PairMap map[string]string + IsGzipped bool + OriginalDataSize int + ModifiedTime uint64 + Ttl *TTL + IsChunkedFile bool + UncompressedData []byte +} + +func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) { + pu = &ParsedUpload{} + pu.PairMap = make(map[string]string) + for k, v := range r.Header { + if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) { + pu.PairMap[k] = v[0] + } + } + + if r.Method == "POST" { + e = parseMultipart(r, sizeLimit, pu) + } else { + e = parsePut(r, sizeLimit, pu) + } + if e != nil { + return + } + + pu.ModifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + pu.Ttl, _ = ReadTTL(r.FormValue("ttl")) + + pu.OriginalDataSize = len(pu.Data) + pu.UncompressedData = pu.Data + if pu.IsGzipped { + if unzipped, e := util.UnGzipData(pu.Data); e == nil { + pu.OriginalDataSize = len(unzipped) + pu.UncompressedData = unzipped + } + } else if shouldGzip, _ := util.IsGzippableFileType("", pu.MimeType); shouldGzip { + if compressedData, err := util.GzipData(pu.Data); err == nil { + pu.Data = compressedData + pu.IsGzipped = true + } + } + + return +} + +func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { + pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip" + pu.MimeType = r.Header.Get("Content-Type") + pu.FileName = "" + pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1)) + if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 { + io.Copy(ioutil.Discard, r.Body) + } + r.Body.Close() + return nil +} + +func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) { + defer func() { + if e != nil && r.Body != nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + } + }() + form, fe := r.MultipartReader() + if fe != nil { + glog.V(0).Infoln("MultipartReader [ERROR]", fe) + e = fe + return + } + + //first multi-part item + part, fe := form.NextPart() + if fe != nil { + glog.V(0).Infoln("Reading Multi part [ERROR]", fe) + e = fe + return + } + + pu.FileName = part.FileName() + if pu.FileName != "" { + pu.FileName = path.Base(pu.FileName) + } + + pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1)) + if e != nil { + glog.V(0).Infoln("Reading Content [ERROR]", e) + return + } + if len(pu.Data) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + + //if the filename is empty string, do a search on the other multi-part items + for pu.FileName == "" { + part2, fe := form.NextPart() + if fe != nil { + break // no more or on error, just safely break + } + + fName := part2.FileName() + + //found the first <file type> multi-part has filename + if fName != "" { + data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1)) + if fe2 != nil { + glog.V(0).Infoln("Reading Content [ERROR]", fe2) + e = fe2 + return + } + if len(data2) == int(sizeLimit)+1 { + e = fmt.Errorf("file over the limited %d bytes", sizeLimit) + return + } + + //update + pu.Data = data2 + pu.FileName = path.Base(fName) + break + } + } + + pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm")) + + if !pu.IsChunkedFile { + + dotIndex := strings.LastIndex(pu.FileName, ".") + ext, mtype := "", "" + if dotIndex > 0 { + ext = strings.ToLower(pu.FileName[dotIndex:]) + mtype = mime.TypeByExtension(ext) + } + contentType := part.Header.Get("Content-Type") + if contentType != "" && contentType != "application/octet-stream" && mtype != contentType { + pu.MimeType = contentType //only return mime type if not deductable + mtype = contentType + } + + pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip" + } + + return +} diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go index 8e5d18b1a..7f8aa4823 100644 --- a/weed/storage/needle/needle_read_write.go +++ b/weed/storage/needle/needle_read_write.go @@ -125,13 +125,13 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version) } -func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) { +func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) { if end, _, e := w.GetStat(); e == nil { - defer func(w backend.DataStorageBackend, off int64) { + defer func(w backend.BackendStorageFile, off int64) { if err != nil { if te := w.Truncate(end); te != nil { - glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te) + glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te) } } }(w, end) @@ -150,7 +150,7 @@ func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset u return offset, size, actualSize, err } -func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, version Version) (dataSlice []byte, err error) { +func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) { dataSize := GetActualSize(size, version) dataSlice = make([]byte, int(dataSize)) @@ -191,7 +191,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers } // ReadData hydrates the needle from the file, with only n.Id is set. -func (n *Needle) ReadData(r backend.DataStorageBackend, offset int64, size uint32, version Version) (err error) { +func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) { bytes, err := ReadNeedleBlob(r, offset, size, version) if err != nil { return err @@ -266,7 +266,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) { return nil } -func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { +func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) { n = new(Needle) if version == Version1 || version == Version2 || version == Version3 { bytes = make([]byte, NeedleHeaderSize) @@ -301,7 +301,7 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 { //n should be a needle already read the header //the input stream will read until next file entry -func (n *Needle) ReadNeedleBody(r backend.DataStorageBackend, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { +func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) { if bodyLength <= 0 { return nil, nil diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go index 4a169870d..179057876 100644 --- a/weed/storage/needle/volume_ttl.go +++ b/weed/storage/needle/volume_ttl.go @@ -69,6 +69,9 @@ func (t *TTL) ToBytes(output []byte) { } func (t *TTL) ToUint32() (output uint32) { + if t == nil || t.Count == 0 { + return 0 + } output = uint32(t.Count) << 8 output += uint32(t.Unit) return output diff --git a/weed/storage/needle_map/btree_map.go b/weed/storage/needle_map/btree_map.go deleted file mode 100644 index a26c5e068..000000000 --- a/weed/storage/needle_map/btree_map.go +++ /dev/null @@ -1,53 +0,0 @@ -package needle_map - -import ( - . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/google/btree" -) - -//This map assumes mostly inserting increasing keys -type BtreeMap struct { - tree *btree.BTree -} - -func NewBtreeMap() *BtreeMap { - return &BtreeMap{ - tree: btree.New(32), - } -} - -func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) { - found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size}) - if found != nil { - old := found.(NeedleValue) - return old.Offset, old.Size - } - return -} - -func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) { - found := cm.tree.Delete(NeedleValue{key, Offset{}, 0}) - if found != nil { - old := found.(NeedleValue) - return old.Size - } - return -} -func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) { - found := cm.tree.Get(NeedleValue{key, Offset{}, 0}) - if found != nil { - old := found.(NeedleValue) - return &old, true - } - return nil, false -} - -// Visit visits all entries or stop if any error when visiting -func (cm *BtreeMap) AscendingVisit(visit func(NeedleValue) error) (ret error) { - cm.tree.Ascend(func(item btree.Item) bool { - needle := item.(NeedleValue) - ret = visit(needle) - return ret == nil - }) - return ret -} diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go index 3bad85727..7eea3969a 100644 --- a/weed/storage/needle_map/compact_map_test.go +++ b/weed/storage/needle_map/compact_map_test.go @@ -8,7 +8,14 @@ import ( func TestOverflow2(t *testing.T) { m := NewCompactMap() - m.Set(NeedleId(150088), ToOffset(8), 3000073) + _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 0 { + t.Fatalf("expecting no previous data") + } + _, oldSize = m.Set(NeedleId(150088), ToOffset(8), 3000073) + if oldSize != 3000073 { + t.Fatalf("expecting previous data size is %d, not %d", 3000073, oldSize) + } m.Set(NeedleId(150073), ToOffset(8), 3000073) m.Set(NeedleId(150089), ToOffset(8), 3000073) m.Set(NeedleId(150076), ToOffset(8), 3000073) diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go new file mode 100644 index 000000000..a52d52a10 --- /dev/null +++ b/weed/storage/needle_map/memdb.go @@ -0,0 +1,119 @@ +package needle_map + +import ( + "fmt" + "os" + + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/syndtr/goleveldb/leveldb/storage" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/idx" + . "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util" +) + +//This map uses in memory level db +type MemDb struct { + db *leveldb.DB +} + +func NewMemDb() *MemDb { + opts := &opt.Options{} + + var err error + t := &MemDb{} + if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil { + glog.V(0).Infof("MemDb fails to open: %v", err) + return nil + } + + return t +} + +func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error { + + bytes := ToBytes(key, offset, size) + + if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil { + return fmt.Errorf("failed to write temp leveldb: %v", err) + } + return nil +} + +func (cm *MemDb) Delete(key NeedleId) error { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes, key) + return cm.db.Delete(bytes, nil) + +} +func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) { + bytes := make([]byte, NeedleIdSize) + NeedleIdToBytes(bytes[0:NeedleIdSize], key) + data, err := cm.db.Get(bytes, nil) + if err != nil || len(data) != OffsetSize+SizeSize { + return nil, false + } + offset := BytesToOffset(data[0:OffsetSize]) + size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + return &NeedleValue{Key: key, Offset: offset, Size: size}, true +} + +// Visit visits all entries or stop if any error when visiting +func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) { + iter := cm.db.NewIterator(nil, nil) + for iter.Next() { + key := BytesToNeedleId(iter.Key()) + data := iter.Value() + offset := BytesToOffset(data[0:OffsetSize]) + size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) + + needle := NeedleValue{Key: key, Offset: offset, Size: size} + ret = visit(needle) + if ret != nil { + return + } + } + iter.Release() + ret = iter.Error() + + return +} + +func (cm *MemDb) SaveToIdx(idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) + if err != nil { + return + } + defer idxFile.Close() + + return cm.AscendingVisit(func(value NeedleValue) error { + if value.Offset.IsZero() || value.Size == TombstoneFileSize { + return nil + } + _, err := idxFile.Write(value.ToBytes()) + return err + }) + +} + +func (cm *MemDb) LoadFromIdx(idxName string) (ret error) { + idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644) + if err != nil { + return + } + defer idxFile.Close() + + return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error { + if offset.IsZero() || size == TombstoneFileSize { + return cm.Delete(key) + } + return cm.Set(key, offset, size) + }) + +} + +func (cm *MemDb) Close() { + cm.db.Close() +} diff --git a/weed/storage/needle_map/memdb_test.go b/weed/storage/needle_map/memdb_test.go new file mode 100644 index 000000000..7b45d23f8 --- /dev/null +++ b/weed/storage/needle_map/memdb_test.go @@ -0,0 +1,23 @@ +package needle_map + +import ( + "testing" + + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func BenchmarkMemDb(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + nm := NewMemDb() + + nid := types.NeedleId(345) + offset := types.Offset{ + OffsetHigher: types.OffsetHigher{}, + OffsetLower: types.OffsetLower{}, + } + nm.Set(nid, offset, 324) + nm.Close() + } + +} diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index ef8571e83..3bb258559 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -128,8 +128,17 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error { } func (m *LevelDbNeedleMap) Close() { - m.indexFile.Close() - m.db.Close() + indexFileName := m.indexFile.Name() + if err := m.indexFile.Sync(); err != nil { + glog.Warningf("sync file %s failed: %v", indexFileName, err) + } + if err := m.indexFile.Close(); err != nil { + glog.Warningf("close index file %s failed: %v", indexFileName, err) + } + + if err := m.db.Close(); err != nil { + glog.Warningf("close levelDB failed: %v", err) + } } func (m *LevelDbNeedleMap) Destroy() error { diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index ee639a7e6..84197912f 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -22,24 +22,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap { return nm } -func NewBtreeNeedleMap(file *os.File) *NeedleMap { - nm := &NeedleMap{ - m: needle_map.NewBtreeMap(), - } - nm.indexFile = file - return nm -} - func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewCompactNeedleMap(file) return doLoading(file, nm) } -func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) { - nm := NewBtreeNeedleMap(file) - return doLoading(file, nm) -} - func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error { nm.MaybeSetMaxFileKey(key) @@ -47,14 +34,12 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) - // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) if !oldOffset.IsZero() && oldSize != TombstoneFileSize { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } } else { oldSize := nm.m.Delete(NeedleId(key)) - // glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } @@ -79,6 +64,10 @@ func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error { return nm.appendToIndexFile(key, offset, TombstoneFileSize) } func (nm *NeedleMap) Close() { + indexFileName := nm.indexFile.Name() + if err := nm.indexFile.Sync(); err != nil { + glog.Warningf("sync file %s failed, %v", indexFileName, err) + } _ = nm.indexFile.Close() } func (nm *NeedleMap) Destroy() error { diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go index 539f83a87..ae2177a30 100644 --- a/weed/storage/needle_map_metric_test.go +++ b/weed/storage/needle_map_metric_test.go @@ -1,17 +1,18 @@ package storage import ( - "github.com/chrislusf/seaweedfs/weed/glog" - . "github.com/chrislusf/seaweedfs/weed/storage/types" "io/ioutil" "math/rand" "testing" + + "github.com/chrislusf/seaweedfs/weed/glog" + . "github.com/chrislusf/seaweedfs/weed/storage/types" ) func TestFastLoadingNeedleMapMetrics(t *testing.T) { idxFile, _ := ioutil.TempFile("", "tmp.idx") - nm := NewBtreeNeedleMap(idxFile) + nm := NewCompactNeedleMap(idxFile) for i := 0; i < 10000; i++ { nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1)) diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go new file mode 100644 index 000000000..e6f9258f3 --- /dev/null +++ b/weed/storage/needle_map_sorted_file.go @@ -0,0 +1,105 @@ +package storage + +import ( + "os" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" + "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + . "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +type SortedFileNeedleMap struct { + baseNeedleMapper + baseFileName string + dbFile *os.File + dbFileSize int64 +} + +func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) { + m = &SortedFileNeedleMap{baseFileName: baseFileName} + m.indexFile = indexFile + fileName := baseFileName + ".sdx" + if !isSortedFileFresh(fileName, indexFile) { + glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name()) + erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx") + glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name()) + } + glog.V(1).Infof("Opening %s...", fileName) + + if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil { + return + } + dbStat, _ := m.dbFile.Stat() + m.dbFileSize = dbStat.Size() + glog.V(1).Infof("Loading %s...", indexFile.Name()) + mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) + if indexLoadError != nil { + return nil, indexLoadError + } + m.mapMetric = *mm + return +} + +func isSortedFileFresh(dbFileName string, indexFile *os.File) bool { + // normally we always write to index file first + dbFile, err := os.Open(dbFileName) + if err != nil { + return false + } + defer dbFile.Close() + dbStat, dbStatErr := dbFile.Stat() + indexStat, indexStatErr := indexFile.Stat() + if dbStatErr != nil || indexStatErr != nil { + glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) + return false + } + + return dbStat.ModTime().After(indexStat.ModTime()) +} + +func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) { + offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil) + ok = err == nil + return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok + +} + +func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { + return os.ErrInvalid +} + +func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error { + + _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil) + + if err != nil { + if err == erasure_coding.NotFoundError { + return nil + } + return err + } + + if size == TombstoneFileSize { + return nil + } + + // write to index file first + if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { + return err + } + _, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted) + + return err +} + +func (m *SortedFileNeedleMap) Close() { + m.indexFile.Close() + m.dbFile.Close() +} + +func (m *SortedFileNeedleMap) Destroy() error { + m.Close() + os.Remove(m.indexFile.Name()) + return os.Remove(m.baseFileName + ".sdx") +} diff --git a/weed/storage/store.go b/weed/storage/store.go index 4d1061bed..e29680f6f 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -2,14 +2,19 @@ package storage import ( "fmt" + "path/filepath" + "strings" "sync/atomic" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "google.golang.org/grpc" ) const ( @@ -60,7 +65,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di return } func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error { - rt, e := NewReplicaPlacementFromString(replicaPlacement) + rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } @@ -101,7 +106,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { +func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } @@ -126,10 +131,10 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind return fmt.Errorf("No more free space left") } -func (s *Store) Status() []*VolumeInfo { +func (s *Store) VolumeInfos() []*VolumeInfo { var stats []*VolumeInfo for _, location := range s.Locations { - location.RLock() + location.volumesLock.RLock() for k, v := range location.volumes { s := &VolumeInfo{ Id: needle.VolumeId(k), @@ -140,13 +145,14 @@ func (s *Store) Status() []*VolumeInfo { FileCount: int(v.FileCount()), DeleteCount: int(v.DeletedCount()), DeletedByteCount: v.DeletedSize(), - ReadOnly: v.readOnly, + ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete, Ttl: v.Ttl, CompactRevision: uint32(v.CompactionRevision), } + s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey() stats = append(stats, s) } - location.RUnlock() + location.volumesLock.RUnlock() } sortVolumeInfos(stats) return stats @@ -167,7 +173,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { for _, location := range s.Locations { var deleteVids []needle.VolumeId maxVolumeCount = maxVolumeCount + location.MaxVolumeCount - location.RLock() + location.volumesLock.RLock() for _, v := range location.volumes { if maxFileKey < v.MaxFileKey() { maxFileKey = v.MaxFileKey() @@ -184,16 +190,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat { fileSize, _, _ := v.FileStat() collectionVolumeSize[v.Collection] += fileSize } - location.RUnlock() + location.volumesLock.RUnlock() if len(deleteVids) > 0 { // delete expired volumes. - location.Lock() + location.volumesLock.Lock() for _, vid := range deleteVids { location.deleteVolumeById(vid) glog.V(0).Infoln("volume", vid, "is deleted.") } - location.Unlock() + location.volumesLock.Unlock() } } @@ -223,11 +229,11 @@ func (s *Store) Close() { func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) { if v := s.findVolume(i); v != nil { - if v.readOnly { + if v.noWriteOrDelete || v.noWriteCanDelete { err = fmt.Errorf("volume %d is read only", i) return } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) { + if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) { _, size, isUnchanged, err = v.writeNeedle(n) } else { err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) @@ -241,10 +247,10 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) { if v := s.findVolume(i); v != nil { - if v.readOnly { + if v.noWriteOrDelete { return 0, fmt.Errorf("volume %d is read only", i) } - if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) { + if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) { return v.deleteNeedle(n) } else { return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize()) @@ -273,7 +279,7 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error { if v == nil { return fmt.Errorf("volume %d not found", i) } - v.readOnly = true + v.noWriteOrDelete = true return nil } @@ -343,6 +349,31 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error { return fmt.Errorf("volume %d not found on disk", i) } +func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error { + + for _, location := range s.Locations { + fileInfo, found := location.LocateVolume(i) + if !found { + continue + } + // load, modify, save + baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name())) + vifFile := filepath.Join(location.Directory, baseFileName+".vif") + volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile) + if err != nil { + return fmt.Errorf("volume %d fail to load vif", i) + } + volumeInfo.Replication = replication + err = pb.SaveVolumeInfo(vifFile, volumeInfo) + if err != nil { + return fmt.Errorf("volume %d fail to save vif", i) + } + return nil + } + + return fmt.Errorf("volume %d not found on disk", i) +} + func (s *Store) SetVolumeSizeLimit(x uint64) { atomic.StoreUint64(&s.volumeSizeLimit, x) } diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 7e3f1a46c..e423e7dca 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/klauspost/reedsolomon" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -16,7 +18,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/klauspost/reedsolomon" ) func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat { @@ -115,19 +116,11 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) { } } -func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) { +func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) { for _, location := range s.Locations { if localEcVolume, found := location.FindEcVolume(vid); found { - // read the volume version - for localEcVolume.Version == 0 { - err := s.readEcVolumeVersion(ctx, vid, localEcVolume) - time.Sleep(1357 * time.Millisecond) - glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err) - } - version := localEcVolume.Version - - offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version) + offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("locate in local ec volume: %v", err) } @@ -140,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n if len(intervals) > 1 { glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals) } - bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals) + bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals) if err != nil { return 0, fmt.Errorf("ReadEcShardIntervals: %v", err) } @@ -148,7 +141,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n return 0, fmt.Errorf("ec entry %s is deleted", n.Id) } - err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version) + err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version) if err != nil { return 0, fmt.Errorf("readbytes: %v", err) } @@ -159,30 +152,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n return 0, fmt.Errorf("ec shard %d not found", vid) } -func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) { - - interval := erasure_coding.Interval{ - BlockIndex: 0, - InnerBlockOffset: 0, - Size: _SuperBlockSize, - IsLargeBlock: true, // it could be large block, but ok in this place - LargeBlockRowsCount: 0, - } - data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval}) - if err == nil { - ecVolume.Version = needle.Version(data[0]) - } - return -} - -func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) { - if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil { + if err = s.cachedLookupEcShardLocations(ecVolume); err != nil { return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err) } for i, interval := range intervals { - if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil { + if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil { return nil, isDeleted, e } else { if isDeleted { @@ -198,7 +175,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n return } -func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { +func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) { shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) data = make([]byte, interval.Size) if shard, found := ecVolume.FindEcVolumeShard(shardId); found { @@ -213,7 +190,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl // try reading directly if hasShardIdLocation { - _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) + _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) if err == nil { return } @@ -222,7 +199,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl } // try reading by recovering from other shards - _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset) + _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset) if err == nil { return } @@ -238,7 +215,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha ecVolume.ShardLocationsLock.Unlock() } -func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) { +func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) { shardCount := len(ecVolume.ShardLocations) if shardCount < erasure_coding.DataShardsCount && @@ -257,7 +234,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras req := &master_pb.LookupEcVolumeRequest{ VolumeId: uint32(ecVolume.VolumeId), } - resp, err := masterClient.LookupEcVolume(ctx, req) + resp, err := masterClient.LookupEcVolume(context.Background(), req) if err != nil { return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err) } @@ -281,7 +258,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras return } -func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) @@ -289,7 +266,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ for _, sourceDataNode := range sourceDataNodes { glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) - n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset) + n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset) if err == nil { return } @@ -299,12 +276,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [ return } -func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice - shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{ + shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ VolumeId: uint32(vid), ShardId: uint32(shardId), Offset: offset, @@ -339,7 +316,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode return } -func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover) enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount) @@ -367,7 +344,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty go func(shardId erasure_coding.ShardId, locations []string) { defer wg.Done() data := make([]byte, len(buf)) - nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset) + nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) if readErr != nil { glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr) forgetShardId(ecVolume, shardId) diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go index e027d2887..4a75fb20b 100644 --- a/weed/storage/store_ec_delete.go +++ b/weed/storage/store_ec_delete.go @@ -12,9 +12,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/types" ) -func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { +func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) { - count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n) + count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n) if err != nil { return 0, err @@ -24,7 +24,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin return 0, fmt.Errorf("unexpected cookie %x", cookie) } - if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil { + if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil { return 0, err } @@ -32,7 +32,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin } -func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version) @@ -43,13 +43,13 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize) hasDeletionSuccess := false - err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId) + err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId) if err == nil { hasDeletionSuccess = true } for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ { - if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil { + if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil { hasDeletionSuccess = true } } @@ -62,7 +62,7 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, } -func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error { ecVolume.ShardLocationsLock.RLock() sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId] @@ -74,7 +74,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar for _, sourceDataNode := range sourceDataNodes { glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode) - err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) + err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId) if err != nil { return err } @@ -85,12 +85,12 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar } -func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { +func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error { return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice - _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{ + _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{ VolumeId: uint32(vid), Collection: collection, FileKey: uint64(needleId), diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index b1f1a6277..e94d9b516 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -16,7 +16,8 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { } func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error { if v := s.findVolume(vid); v != nil { - return v.Compact(preallocate, compactionBytePerSecond) + return v.Compact2(preallocate) // compactionBytePerSecond + // return v.Compact(preallocate, compactionBytePerSecond) } return fmt.Errorf("volume id %d is not found during compact", vid) } diff --git a/weed/storage/replica_placement.go b/weed/storage/super_block/replica_placement.go index c1aca52eb..fcccbba7d 100644 --- a/weed/storage/replica_placement.go +++ b/weed/storage/super_block/replica_placement.go @@ -1,4 +1,4 @@ -package storage +package super_block import ( "errors" diff --git a/weed/storage/replica_placement_test.go b/weed/storage/super_block/replica_placement_test.go index 7968af7cb..7742ba548 100644 --- a/weed/storage/replica_placement_test.go +++ b/weed/storage/super_block/replica_placement_test.go @@ -1,4 +1,4 @@ -package storage +package super_block import ( "testing" diff --git a/weed/storage/super_block/super_block.go b/weed/storage/super_block/super_block.go new file mode 100644 index 000000000..f48cd0bdc --- /dev/null +++ b/weed/storage/super_block/super_block.go @@ -0,0 +1,69 @@ +package super_block + +import ( + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +const ( + SuperBlockSize = 8 +) + +/* +* Super block currently has 8 bytes allocated for each volume. +* Byte 0: version, 1 or 2 +* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc +* Byte 2 and byte 3: Time to live. See TTL for definition +* Byte 4 and byte 5: The number of times the volume has been compacted. +* Rest bytes: Reserved + */ +type SuperBlock struct { + Version needle.Version + ReplicaPlacement *ReplicaPlacement + Ttl *needle.TTL + CompactionRevision uint16 + Extra *master_pb.SuperBlockExtra + ExtraSize uint16 +} + +func (s *SuperBlock) BlockSize() int { + switch s.Version { + case needle.Version2, needle.Version3: + return SuperBlockSize + int(s.ExtraSize) + } + return SuperBlockSize +} + +func (s *SuperBlock) Bytes() []byte { + header := make([]byte, SuperBlockSize) + header[0] = byte(s.Version) + header[1] = s.ReplicaPlacement.Byte() + s.Ttl.ToBytes(header[2:4]) + util.Uint16toBytes(header[4:6], s.CompactionRevision) + + if s.Extra != nil { + extraData, err := proto.Marshal(s.Extra) + if err != nil { + glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err) + } + extraSize := len(extraData) + if extraSize > 256*256-2 { + // reserve a couple of bits for future extension + glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2) + } + s.ExtraSize = uint16(extraSize) + util.Uint16toBytes(header[6:8], s.ExtraSize) + + header = append(header, extraData...) + } + + return header +} + +func (s *SuperBlock) Initialized() bool { + return s.ReplicaPlacement != nil && s.Ttl != nil +} diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go new file mode 100644 index 000000000..9eb12e116 --- /dev/null +++ b/weed/storage/super_block/super_block_read.go.go @@ -0,0 +1,44 @@ +package super_block + +import ( + "fmt" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// ReadSuperBlock reads from data file and load it into volume's super block +func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) { + + header := make([]byte, SuperBlockSize) + if _, e := datBackend.ReadAt(header, 0); e != nil { + err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e) + return + } + + superBlock.Version = needle.Version(header[0]) + if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { + err = fmt.Errorf("cannot read replica type: %s", err.Error()) + return + } + superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4]) + superBlock.CompactionRevision = util.BytesToUint16(header[4:6]) + superBlock.ExtraSize = util.BytesToUint16(header[6:8]) + + if superBlock.ExtraSize > 0 { + // read more + extraData := make([]byte, int(superBlock.ExtraSize)) + superBlock.Extra = &master_pb.SuperBlockExtra{} + err = proto.Unmarshal(extraData, superBlock.Extra) + if err != nil { + err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err) + return + } + } + + return +} diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/super_block/super_block_test.go index 06ad8a5d3..25699070d 100644 --- a/weed/storage/volume_super_block_test.go +++ b/weed/storage/super_block/super_block_test.go @@ -1,4 +1,4 @@ -package storage +package super_block import ( "testing" @@ -10,7 +10,7 @@ func TestSuperBlockReadWrite(t *testing.T) { rp, _ := NewReplicaPlacementFromByte(byte(001)) ttl, _ := needle.ReadTTL("15d") s := &SuperBlock{ - version: needle.CurrentVersion, + Version: needle.CurrentVersion, ReplicaPlacement: rp, Ttl: ttl, } diff --git a/weed/storage/volume.go b/weed/storage/volume.go index e85696eab..7da83de7a 100644 --- a/weed/storage/volume.go +++ b/weed/storage/volume.go @@ -2,18 +2,19 @@ package storage import ( "fmt" + "path" + "strconv" + "sync" + "time" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" - "path" - "strconv" - "sync" - "time" - "github.com/chrislusf/seaweedfs/weed/glog" ) @@ -21,15 +22,17 @@ type Volume struct { Id needle.VolumeId dir string Collection string - DataBackend backend.DataStorageBackend + DataBackend backend.BackendStorageFile nm NeedleMapper needleMapKind NeedleMapType - readOnly bool + noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete + hasRemoteFile bool // if the volume has a remote file MemoryMapMaxSizeMb uint32 - SuperBlock + super_block.SuperBlock - dataFileAccessLock sync.Mutex + dataFileAccessLock sync.RWMutex lastModifiedTsSeconds uint64 //unix time in seconds lastAppendAtNs uint64 //unix time in nanoseconds @@ -37,18 +40,20 @@ type Volume struct { lastCompactRevision uint16 isCompacting bool + + volumeInfo *volume_server_pb.VolumeInfo } -func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) { // if replicaPlacement is nil, the superblock will be loaded from disk v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} + v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} v.needleMapKind = needleMapKind e = v.load(true, true, needleMapKind, preallocate) return } func (v *Volume) String() string { - return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly) + return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete) } func VolumeFileName(dir string, collection string, id int) (fileName string) { @@ -65,12 +70,15 @@ func (v *Volume) FileName() (fileName string) { } func (v *Volume) Version() needle.Version { - return v.SuperBlock.Version() + if v.volumeInfo.Version != 0 { + v.SuperBlock.Version = needle.Version(v.volumeInfo.Version) + } + return v.SuperBlock.Version } func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.DataBackend == nil { return @@ -80,13 +88,13 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) if e == nil { return uint64(datFileSize), v.nm.IndexFileSize(), modTime } - glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.String(), e) + glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e) return // -1 causes integer overflow and the volume to become unwritable. } func (v *Volume) ContentSize() uint64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -94,8 +102,8 @@ func (v *Volume) ContentSize() uint64 { } func (v *Volume) DeletedSize() uint64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -103,8 +111,8 @@ func (v *Volume) DeletedSize() uint64 { } func (v *Volume) FileCount() uint64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -112,8 +120,8 @@ func (v *Volume) FileCount() uint64 { } func (v *Volume) DeletedCount() uint64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -121,8 +129,8 @@ func (v *Volume) DeletedCount() uint64 { } func (v *Volume) MaxFileKey() types.NeedleId { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -130,8 +138,8 @@ func (v *Volume) MaxFileKey() types.NeedleId { } func (v *Volume) IndexFileSize() uint64 { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() if v.nm == nil { return 0 } @@ -172,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool { if v.Ttl == nil || v.Ttl.Minutes() == 0 { return false } - glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) + glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds) livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60 - glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) if int64(v.Ttl.Minutes()) < livedMinutes { return true } @@ -200,18 +208,32 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool { func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { size, _, modTime := v.FileStat() - return &master_pb.VolumeInformationMessage{ + volumInfo := &master_pb.VolumeInformationMessage{ Id: uint32(v.Id), Size: size, Collection: v.Collection, - FileCount: uint64(v.FileCount()), - DeleteCount: uint64(v.DeletedCount()), + FileCount: v.FileCount(), + DeleteCount: v.DeletedCount(), DeletedByteCount: v.DeletedSize(), - ReadOnly: v.readOnly, + ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete, ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()), Version: uint32(v.Version()), Ttl: v.Ttl.ToUint32(), CompactRevision: uint32(v.SuperBlock.CompactionRevision), ModifiedAtSecond: modTime.Unix(), } + + volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey() + + return volumInfo +} + +func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) { + if v.volumeInfo == nil { + return + } + if len(v.volumeInfo.GetFiles()) == 0 { + return + } + return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey() } diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go index fe0506917..f7075fe2b 100644 --- a/weed/storage/volume_backup.go +++ b/weed/storage/volume_backup.go @@ -6,17 +6,19 @@ import ( "io" "os" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" - "google.golang.org/grpc" ) func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{} if datSize, _, err := v.DataBackend.GetStat(); err == nil { @@ -62,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.) func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error { - ctx := context.Background() - startFromOffset, _, _ := v.FileStat() appendAtNs, err := v.findLastAppendAtNs() if err != nil { @@ -74,7 +74,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { - stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{ + stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{ VolumeId: uint32(v.Id), SinceNs: appendAtNs, }) @@ -108,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial } // add to needle map - return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) + return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v}) } @@ -154,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()) + n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } - _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) + _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength) if err != nil { return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } @@ -244,7 +244,7 @@ type VolumeFileScanner4GenIdx struct { v *Volume } -func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error { +func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error { return nil } diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 61b59e9f7..a65c2a3ff 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -55,7 +55,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err return } -func verifyNeedleIntegrity(datFile backend.DataStorageBackend, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { +func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) { n := new(needle.Needle) if err = n.ReadData(datFile, offset, size, v); err != nil { return n.AppendAtNs, err diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go index b27a62990..ffcb246a4 100644 --- a/weed/storage/volume_create.go +++ b/weed/storage/volume_create.go @@ -9,10 +9,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if e != nil { + return nil, e + } if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } - return backend.NewDiskFile(file), e + return backend.NewDiskFile(file), nil } diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go index e3305d991..ee599ac32 100644 --- a/weed/storage/volume_create_linux.go +++ b/weed/storage/volume_create_linux.go @@ -10,11 +10,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/backend" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644) + if e != nil { + return nil, e + } if preallocate != 0 { syscall.Fallocate(int(file.Fd()), 1, 0, preallocate) glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName) } - return backend.NewDiskFile(file), e + return backend.NewDiskFile(file), nil } diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go index 81536810b..e1c0b961f 100644 --- a/weed/storage/volume_create_windows.go +++ b/weed/storage/volume_create_windows.go @@ -11,18 +11,23 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads" ) -func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) { - +func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) { if preallocate > 0 { glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName) } if memoryMapSizeMB > 0 { file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true) - return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e + if e != nil { + return nil, e + } + return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), nil } else { file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false) - return backend.NewDiskFile(file), e + if e != nil { + return nil, e + } + return backend.NewDiskFile(file), nil } } diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go index 111058b6e..313818cde 100644 --- a/weed/storage/volume_info.go +++ b/weed/storage/volume_info.go @@ -6,37 +6,42 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) type VolumeInfo struct { - Id needle.VolumeId - Size uint64 - ReplicaPlacement *ReplicaPlacement - Ttl *needle.TTL - Collection string - Version needle.Version - FileCount int - DeleteCount int - DeletedByteCount uint64 - ReadOnly bool - CompactRevision uint32 - ModifiedAtSecond int64 + Id needle.VolumeId + Size uint64 + ReplicaPlacement *super_block.ReplicaPlacement + Ttl *needle.TTL + Collection string + Version needle.Version + FileCount int + DeleteCount int + DeletedByteCount uint64 + ReadOnly bool + CompactRevision uint32 + ModifiedAtSecond int64 + RemoteStorageName string + RemoteStorageKey string } func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error) { vi = VolumeInfo{ - Id: needle.VolumeId(m.Id), - Size: m.Size, - Collection: m.Collection, - FileCount: int(m.FileCount), - DeleteCount: int(m.DeleteCount), - DeletedByteCount: m.DeletedByteCount, - ReadOnly: m.ReadOnly, - Version: needle.Version(m.Version), - CompactRevision: m.CompactRevision, - ModifiedAtSecond: m.ModifiedAtSecond, + Id: needle.VolumeId(m.Id), + Size: m.Size, + Collection: m.Collection, + FileCount: int(m.FileCount), + DeleteCount: int(m.DeleteCount), + DeletedByteCount: m.DeletedByteCount, + ReadOnly: m.ReadOnly, + Version: needle.Version(m.Version), + CompactRevision: m.CompactRevision, + ModifiedAtSecond: m.ModifiedAtSecond, + RemoteStorageName: m.RemoteStorageName, + RemoteStorageKey: m.RemoteStorageKey, } - rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) + rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { return vi, e } @@ -51,7 +56,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu Collection: m.Collection, Version: needle.Version(m.Version), } - rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) + rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement)) if e != nil { return vi, e } @@ -60,6 +65,10 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu return vi, nil } +func (vi VolumeInfo) IsRemote() bool { + return vi.RemoteStorageName != "" +} + func (vi VolumeInfo) String() string { return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v", vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly) @@ -67,18 +76,20 @@ func (vi VolumeInfo) String() string { func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage { return &master_pb.VolumeInformationMessage{ - Id: uint32(vi.Id), - Size: uint64(vi.Size), - Collection: vi.Collection, - FileCount: uint64(vi.FileCount), - DeleteCount: uint64(vi.DeleteCount), - DeletedByteCount: vi.DeletedByteCount, - ReadOnly: vi.ReadOnly, - ReplicaPlacement: uint32(vi.ReplicaPlacement.Byte()), - Version: uint32(vi.Version), - Ttl: vi.Ttl.ToUint32(), - CompactRevision: vi.CompactRevision, - ModifiedAtSecond: vi.ModifiedAtSecond, + Id: uint32(vi.Id), + Size: uint64(vi.Size), + Collection: vi.Collection, + FileCount: uint64(vi.FileCount), + DeleteCount: uint64(vi.DeleteCount), + DeletedByteCount: vi.DeletedByteCount, + ReadOnly: vi.ReadOnly, + ReplicaPlacement: uint32(vi.ReplicaPlacement.Byte()), + Version: uint32(vi.Version), + Ttl: vi.Ttl.ToUint32(), + CompactRevision: vi.CompactRevision, + ModifiedAtSecond: vi.ModifiedAtSecond, + RemoteStorageName: vi.RemoteStorageName, + RemoteStorageKey: vi.RemoteStorageKey, } } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 6f1d8fe40..6b42fc452 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -3,146 +3,148 @@ package storage import ( "fmt" "os" - "time" - "github.com/chrislusf/seaweedfs/weed/stats" - "github.com/chrislusf/seaweedfs/weed/storage/backend" - "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/util" ) -func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) { +func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{} + v.SuperBlock = super_block.SuperBlock{} v.needleMapKind = needleMapKind - e = v.load(false, false, needleMapKind, 0) + err = v.load(false, false, needleMapKind, 0) return } -func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error { - var e error +func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) { fileName := v.FileName() alreadyHasSuperBlock := false - // open dat file - if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists { + hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0 + + if v.HasRemoteFile() { + v.noWriteCanDelete = true + v.noWriteOrDelete = false + glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files) + v.LoadRemoteFile() + alreadyHasSuperBlock = true + } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists { + // open dat file if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } var dataFile *os.File if canWrite { - dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") - dataFile, e = os.Open(fileName + ".dat") - v.readOnly = true + dataFile, err = os.Open(fileName + ".dat") + v.noWriteOrDelete = true } v.lastModifiedTsSeconds = uint64(modifiedTime.Unix()) - if fileSize >= _SuperBlockSize { + if fileSize >= super_block.SuperBlockSize { alreadyHasSuperBlock = true } v.DataBackend = backend.NewDiskFile(dataFile) } else { if createDatIfMissing { - v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) + v.DataBackend, err = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb) } else { return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName) } } - if e != nil { - if !os.IsPermission(e) { - return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e) + if err != nil { + if !os.IsPermission(err) { + return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err) } else { - return fmt.Errorf("load data file %s.dat: %v", fileName, e) + return fmt.Errorf("load data file %s.dat: %v", fileName, err) } } if alreadyHasSuperBlock { - e = v.readSuperBlock() + err = v.readSuperBlock() } else { if !v.SuperBlock.Initialized() { return fmt.Errorf("volume %s.dat not initialized", fileName) } - e = v.maybeWriteSuperBlock() + err = v.maybeWriteSuperBlock() } - if e == nil && alsoLoadIndex { + if err == nil && alsoLoadIndex { var indexFile *os.File - if v.readOnly { - glog.V(1).Infoln("open to read file", fileName+".idx") - if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil { - return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e) + if v.noWriteOrDelete { + glog.V(0).Infoln("open to read file", fileName+".idx") + if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil { + return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err) } } else { glog.V(1).Infoln("open to write file", fileName+".idx") - if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil { - return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e) + if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil { + return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err) } } - if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil { - v.readOnly = true - glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e) + if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil { + v.noWriteOrDelete = true + glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err) } - switch needleMapKind { - case NeedleMapInMemory: - glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly) - if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil { - glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e) - } - case NeedleMapLevelDb: - glog.V(0).Infoln("loading leveldb", fileName+".ldb") - opts := &opt.Options{ - BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, // default value is 1 - } - if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) - } - case NeedleMapLevelDbMedium: - glog.V(0).Infoln("loading leveldb medium", fileName+".ldb") - opts := &opt.Options{ - BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, // default value is 1 - } - if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) - } - case NeedleMapLevelDbLarge: - glog.V(0).Infoln("loading leveldb large", fileName+".ldb") - opts := &opt.Options{ - BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB - WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB - CompactionTableSizeMultiplier: 10, // default value is 1 + + if v.noWriteOrDelete || v.noWriteCanDelete { + if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil { + glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err) } - if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { - glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) + } else { + switch needleMapKind { + case NeedleMapInMemory: + glog.V(0).Infoln("loading index", fileName+".idx", "to memory") + if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil { + glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err) + } + case NeedleMapLevelDb: + glog.V(0).Infoln("loading leveldb", fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + } + case NeedleMapLevelDbMedium: + glog.V(0).Infoln("loading leveldb medium", fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + } + case NeedleMapLevelDbLarge: + glog.V(0).Infoln("loading leveldb large", fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB + CompactionTableSizeMultiplier: 10, // default value is 1 + } + if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err) + } } } } - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() + if !hasVolumeInfoFile { + v.volumeInfo.Version = uint32(v.SuperBlock.Version) + v.SaveVolumeInfo() + } - return e -} + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc() -func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) { - exists = true - fi, err := os.Stat(filename) - if os.IsNotExist(err) { - exists = false - return - } - if fi.Mode()&0400 != 0 { - canRead = true - } - if fi.Mode()&0200 != 0 { - canWrite = true - } - modTime = fi.ModTime() - fileSize = fi.Size() - return + return err } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 242325755..ac6154cef 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -11,6 +11,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -45,22 +46,25 @@ func (v *Volume) Destroy() (err error) { err = fmt.Errorf("volume %d is compacting", v.Id) return } + storageName, storageKey := v.RemoteStorageNameKey() + if v.HasRemoteFile() && storageName != "" && storageKey != "" { + if backendStorage, found := backend.BackendStorages[storageName]; found { + backendStorage.DeleteFile(storageKey) + } + } v.Close() os.Remove(v.FileName() + ".dat") os.Remove(v.FileName() + ".idx") + os.Remove(v.FileName() + ".vif") + os.Remove(v.FileName() + ".sdx") os.Remove(v.FileName() + ".cpd") os.Remove(v.FileName() + ".cpx") os.RemoveAll(v.FileName() + ".ldb") - os.RemoveAll(v.FileName() + ".bdb") return } func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) { - glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) - if v.readOnly { - err = fmt.Errorf("%s is read-only", v.DataBackend.String()) - return - } + // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() if v.isFileUnchanged(n) { @@ -110,9 +114,6 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String()) - if v.readOnly { - return 0, fmt.Errorf("%s is read-only", v.DataBackend.String()) - } v.dataFileAccessLock.Lock() defer v.dataFileAccessLock.Unlock() nv, ok := v.nm.Get(n.Id) @@ -136,8 +137,8 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) { // read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) readNeedle(n *needle.Needle) (int, error) { - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.RLock() + defer v.dataFileAccessLock.RUnlock() nv, ok := v.nm.Get(n.Id) if !ok || nv.Offset.IsZero() { @@ -171,7 +172,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) { } type VolumeFileScanner interface { - VisitSuperBlock(SuperBlock) error + VisitSuperBlock(super_block.SuperBlock) error ReadNeedleBody() bool VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error } @@ -183,8 +184,10 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil { return fmt.Errorf("failed to load volume %d: %v", id, err) } - if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { - return fmt.Errorf("failed to process volume %d super block: %v", id, err) + if v.volumeInfo.Version == 0 { + if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil { + return fmt.Errorf("failed to process volume %d super block: %v", id, err) + } } defer v.Close() @@ -195,13 +198,13 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId, return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner) } -func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) { +func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) { n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset) if e != nil { if e == io.EOF { return nil } - return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.String(), offset, e) + return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e) } for n != nil { var needleBody []byte diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go index bce5af465..5e913e062 100644 --- a/weed/storage/volume_super_block.go +++ b/weed/storage/volume_super_block.go @@ -5,92 +5,29 @@ import ( "os" "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" - "github.com/golang/protobuf/proto" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) -const ( - _SuperBlockSize = 8 -) - -/* -* Super block currently has 8 bytes allocated for each volume. -* Byte 0: version, 1 or 2 -* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc -* Byte 2 and byte 3: Time to live. See TTL for definition -* Byte 4 and byte 5: The number of times the volume has been compacted. -* Rest bytes: Reserved - */ -type SuperBlock struct { - version needle.Version - ReplicaPlacement *ReplicaPlacement - Ttl *needle.TTL - CompactionRevision uint16 - Extra *master_pb.SuperBlockExtra - extraSize uint16 -} - -func (s *SuperBlock) BlockSize() int { - switch s.version { - case needle.Version2, needle.Version3: - return _SuperBlockSize + int(s.extraSize) - } - return _SuperBlockSize -} - -func (s *SuperBlock) Version() needle.Version { - return s.version -} -func (s *SuperBlock) Bytes() []byte { - header := make([]byte, _SuperBlockSize) - header[0] = byte(s.version) - header[1] = s.ReplicaPlacement.Byte() - s.Ttl.ToBytes(header[2:4]) - util.Uint16toBytes(header[4:6], s.CompactionRevision) - - if s.Extra != nil { - extraData, err := proto.Marshal(s.Extra) - if err != nil { - glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err) - } - extraSize := len(extraData) - if extraSize > 256*256-2 { - // reserve a couple of bits for future extension - glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2) - } - s.extraSize = uint16(extraSize) - util.Uint16toBytes(header[6:8], s.extraSize) - - header = append(header, extraData...) - } - - return header -} - -func (s *SuperBlock) Initialized() bool { - return s.ReplicaPlacement != nil && s.Ttl != nil -} - func (v *Volume) maybeWriteSuperBlock() error { datSize, _, e := v.DataBackend.GetStat() if e != nil { - glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e) + glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.Name(), e) return e } if datSize == 0 { - v.SuperBlock.version = needle.CurrentVersion + v.SuperBlock.Version = needle.CurrentVersion _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0) if e != nil && os.IsPermission(e) { //read-only, but zero length - recreate it! var dataFile *os.File - if dataFile, e = os.Create(v.DataBackend.String()); e == nil { + if dataFile, e = os.Create(v.DataBackend.Name()); e == nil { v.DataBackend = backend.NewDiskFile(dataFile) if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil { - v.readOnly = false + v.noWriteOrDelete = false + v.noWriteCanDelete = false } } } @@ -99,38 +36,13 @@ func (v *Volume) maybeWriteSuperBlock() error { } func (v *Volume) readSuperBlock() (err error) { - v.SuperBlock, err = ReadSuperBlock(v.DataBackend) - return err -} - -// ReadSuperBlock reads from data file and load it into volume's super block -func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) { - - header := make([]byte, _SuperBlockSize) - if _, e := datBackend.ReadAt(header, 0); e != nil { - err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e) - return - } - - superBlock.version = needle.Version(header[0]) - if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { - err = fmt.Errorf("cannot read replica type: %s", err.Error()) - return - } - superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4]) - superBlock.CompactionRevision = util.BytesToUint16(header[4:6]) - superBlock.extraSize = util.BytesToUint16(header[6:8]) - - if superBlock.extraSize > 0 { - // read more - extraData := make([]byte, int(superBlock.extraSize)) - superBlock.Extra = &master_pb.SuperBlockExtra{} - err = proto.Unmarshal(extraData, superBlock.Extra) - if err != nil { - err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.String(), err) - return + v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend) + if v.volumeInfo != nil && v.volumeInfo.Replication != "" { + if replication, err := super_block.NewReplicaPlacementFromString(v.volumeInfo.Replication); err != nil { + return fmt.Errorf("Error parse volume %d replication %s : %v", v.Id, v.volumeInfo.Replication, err) + } else { + v.SuperBlock.ReplicaPlacement = replication } } - - return + return err } diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go new file mode 100644 index 000000000..fd7b08654 --- /dev/null +++ b/weed/storage/volume_tier.go @@ -0,0 +1,50 @@ +package storage + +import ( + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend" +) + +func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo { + return v.volumeInfo +} + +func (v *Volume) maybeLoadVolumeInfo() (found bool) { + + v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif") + + if v.hasRemoteFile { + glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id, + v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key) + } + + return + +} + +func (v *Volume) HasRemoteFile() bool { + return v.hasRemoteFile +} + +func (v *Volume) LoadRemoteFile() error { + tierFile := v.volumeInfo.GetFiles()[0] + backendStorage := backend.BackendStorages[tierFile.BackendName()] + + if v.DataBackend != nil { + v.DataBackend.Close() + } + + v.DataBackend = backendStorage.NewStorageFile(tierFile.Key, v.volumeInfo) + return nil +} + +func (v *Volume) SaveVolumeInfo() error { + + tierFileName := v.FileName() + ".vif" + + return pb.SaveVolumeInfo(tierFileName, v.volumeInfo) + +} diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index e90746b54..5d0d63877 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -3,6 +3,7 @@ package storage import ( "fmt" "os" + "runtime" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -11,6 +12,7 @@ import ( idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/needle_map" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" . "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -19,101 +21,124 @@ func (v *Volume) garbageLevel() float64 { if v.ContentSize() == 0 { return 0 } - return float64(v.DeletedSize()) / float64(v.ContentSize()) + deletedSize := v.DeletedSize() + fileSize := v.ContentSize() + if v.DeletedCount() > 0 && v.DeletedSize() == 0 { + // this happens for .sdx converted back to normal .idx + // where deleted entry size is missing + datFileSize, _, _ := v.FileStat() + deletedSize = datFileSize - fileSize - super_block.SuperBlockSize + fileSize = datFileSize + } + return float64(deletedSize) / float64(fileSize) } +// compact a volume based on deletions in .dat files func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error { - if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory - glog.V(3).Infof("Compacting volume %d ...", v.Id) - //no need to lock for copy on write - //v.accessLock.Lock() - //defer v.accessLock.Unlock() - //glog.V(3).Infof("Got Compaction lock...") - v.isCompacting = true - defer func() { - v.isCompacting = false - }() - - filePath := v.FileName() - v.lastCompactIndexOffset = v.IndexFileSize() - v.lastCompactRevision = v.SuperBlock.CompactionRevision - glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) - return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) - } else { + if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil } + glog.V(3).Infof("Compacting volume %d ...", v.Id) + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") + v.isCompacting = true + defer func() { + v.isCompacting = false + }() + + filePath := v.FileName() + v.lastCompactIndexOffset = v.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactionRevision + glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset) + return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond) } -func (v *Volume) Compact2() error { - - if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory - glog.V(3).Infof("Compact2 volume %d ...", v.Id) +// compact a volume based on deletions in .idx files +func (v *Volume) Compact2(preallocate int64) error { - v.isCompacting = true - defer func() { - v.isCompacting = false - }() - - filePath := v.FileName() - glog.V(3).Infof("creating copies for volume %d ...", v.Id) - return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx") - } else { + if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory return nil } + glog.V(3).Infof("Compact2 volume %d ...", v.Id) + + v.isCompacting = true + defer func() { + v.isCompacting = false + }() + + filePath := v.FileName() + v.lastCompactIndexOffset = v.IndexFileSize() + v.lastCompactRevision = v.SuperBlock.CompactionRevision + glog.V(3).Infof("creating copies for volume %d ...", v.Id) + return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate) } func (v *Volume) CommitCompact() error { - if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory - glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) + if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory + return nil + } + glog.V(0).Infof("Committing volume %d vacuuming...", v.Id) - v.isCompacting = true - defer func() { - v.isCompacting = false - }() + v.isCompacting = true + defer func() { + v.isCompacting = false + }() - v.dataFileAccessLock.Lock() - defer v.dataFileAccessLock.Unlock() + v.dataFileAccessLock.Lock() + defer v.dataFileAccessLock.Unlock() - glog.V(3).Infof("Got volume %d committing lock...", v.Id) - v.nm.Close() + glog.V(3).Infof("Got volume %d committing lock...", v.Id) + v.nm.Close() + if v.DataBackend != nil { if err := v.DataBackend.Close(); err != nil { glog.V(0).Infof("fail to close volume %d", v.Id) } - v.DataBackend = nil - stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() - - var e error - if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { - glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) - e = os.Remove(v.FileName() + ".cpd") + } + v.DataBackend = nil + stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec() + + var e error + if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil { + glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e) + e = os.Remove(v.FileName() + ".cpd") + if e != nil { + return e + } + e = os.Remove(v.FileName() + ".cpx") + if e != nil { + return e + } + } else { + if runtime.GOOS == "windows" { + e = os.RemoveAll(v.FileName() + ".dat") if e != nil { return e } - e = os.Remove(v.FileName() + ".cpx") + e = os.RemoveAll(v.FileName() + ".idx") if e != nil { return e } - } else { - var e error - if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) - } - if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { - return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) - } } + var e error + if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e) + } + if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil { + return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e) + } + } - //glog.V(3).Infof("Pretending to be vacuuming...") - //time.Sleep(20 * time.Second) + //glog.V(3).Infof("Pretending to be vacuuming...") + //time.Sleep(20 * time.Second) - os.RemoveAll(v.FileName() + ".ldb") - os.RemoveAll(v.FileName() + ".bdb") + os.RemoveAll(v.FileName() + ".ldb") - glog.V(3).Infof("Loading volume %d commit file...", v.Id) - if e = v.load(true, false, v.needleMapKind, 0); e != nil { - return e - } + glog.V(3).Infof("Loading volume %d commit file...", v.Id) + if e = v.load(true, false, v.needleMapKind, 0); e != nil { + return e } return nil } @@ -132,14 +157,15 @@ func (v *Volume) cleanupCompact() error { return nil } -func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) { - superBlock, err := ReadSuperBlock(datBackend) +func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (compactRevision uint16, err error) { + superBlock, err := super_block.ReadSuperBlock(datBackend) if err != nil { return 0, err } return superBlock.CompactionRevision, nil } +// if old .dat and .idx files are updated, this func tries to apply the same changes to new files accordingly func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) { var indexSize int64 @@ -150,6 +176,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI oldDatBackend := backend.NewDiskFile(oldDatFile) defer oldDatBackend.Close() + // skip if the old .idx file has not changed if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil { return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err) } @@ -157,6 +184,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI return nil } + // fail if the old .dat file has changed to a new revision oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend) if err != nil { return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err) @@ -270,15 +298,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI type VolumeFileScanner4Vacuum struct { version needle.Version v *Volume - dstBackend backend.DataStorageBackend - nm *NeedleMap + dstBackend backend.BackendStorageFile + nm *needle_map.MemDb newOffset int64 now uint64 writeThrottler *util.WriteThrottler } -func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error { - scanner.version = superBlock.Version() +func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error { + scanner.version = superBlock.Version superBlock.CompactionRevision++ _, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0) scanner.newOffset = int64(superBlock.BlockSize()) @@ -296,7 +324,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { - if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { + if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil { @@ -312,90 +340,92 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) { var ( - dst backend.DataStorageBackend - idx *os.File + dst backend.BackendStorageFile ) if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil { return } defer dst.Close() - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { - return - } - defer idx.Close() + nm := needle_map.NewMemDb() + defer nm.Close() scanner := &VolumeFileScanner4Vacuum{ v: v, now: uint64(time.Now().Unix()), - nm: NewBtreeNeedleMap(idx), + nm: nm, dstBackend: dst, writeThrottler: util.NewWriteThrottler(compactionBytePerSecond), } err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner) + if err != nil { + return nil + } + + err = nm.SaveToIdx(idxName) return } -func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { +func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) { var ( - dst, idx, oldIndexFile *os.File + srcDatBackend, dstDatBackend backend.BackendStorageFile + dataFile *os.File ) - if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + if dstDatBackend, err = createVolumeFile(dstDatName, preallocate, 0); err != nil { return } - dstDatBackend := backend.NewDiskFile(dst) defer dstDatBackend.Close() - if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil { + oldNm := needle_map.NewMemDb() + defer oldNm.Close() + newNm := needle_map.NewMemDb() + defer newNm.Close() + if err = oldNm.LoadFromIdx(srcIdxName); err != nil { return } - defer idx.Close() - - if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil { - return + if dataFile, err = os.Open(srcDatName); err != nil { + return err } - defer oldIndexFile.Close() + srcDatBackend = backend.NewDiskFile(dataFile) + defer srcDatBackend.Close() - nm := NewBtreeNeedleMap(idx) now := uint64(time.Now().Unix()) - v.SuperBlock.CompactionRevision++ - dst.Write(v.SuperBlock.Bytes()) - newOffset := int64(v.SuperBlock.BlockSize()) + sb.CompactionRevision++ + dstDatBackend.WriteAt(sb.Bytes(), 0) + newOffset := int64(sb.BlockSize()) - idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset.IsZero() || size == TombstoneFileSize { - return nil - } + oldNm.AscendingVisit(func(value needle_map.NeedleValue) error { + + offset, size := value.Offset, value.Size - nv, ok := v.nm.Get(key) - if !ok { + if offset.IsZero() || size == TombstoneFileSize { return nil } n := new(needle.Needle) - err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version()) + err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version) if err != nil { return nil } - if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) { return nil } - glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if nv.Offset == offset && nv.Size > 0 { - if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil { - return fmt.Errorf("cannot put needle: %s", err) - } - if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil { - return fmt.Errorf("cannot append needle: %s", err) - } - newOffset += n.DiskSize(v.Version()) - glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil { + return fmt.Errorf("cannot put needle: %s", err) + } + if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil { + return fmt.Errorf("cannot append needle: %s", err) } + newOffset += n.DiskSize(version) + glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size) + return nil }) + newNm.SaveToIdx(datIdxName) + return } diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go index ba1e59f2c..95f43d6ec 100644 --- a/weed/storage/volume_vacuum_test.go +++ b/weed/storage/volume_vacuum_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/super_block" "github.com/chrislusf/seaweedfs/weed/storage/types" ) @@ -46,7 +47,7 @@ func TestMakeDiff(t *testing.T) { v := new(Volume) //lastCompactIndexOffset value is the index file size before step 4 v.lastCompactIndexOffset = 96 - v.SuperBlock.version = 0x2 + v.SuperBlock.Version = 0x2 /* err := v.makeupDiff( "/yourpath/1.cpd", @@ -68,7 +69,7 @@ func TestCompaction(t *testing.T) { } defer os.RemoveAll(dir) // clean up - v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0) + v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0) if err != nil { t.Fatalf("volume creation: %v", err) } @@ -83,7 +84,7 @@ func TestCompaction(t *testing.T) { } startTime := time.Now() - v.Compact(0, 1024*1024) + v.Compact2(0) speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds() t.Logf("compaction speed: %.2f bytes/s", speed) |
