aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/backend.go122
-rw-r--r--weed/storage/backend/disk_file.go4
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go4
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go197
-rw-r--r--weed/storage/backend/s3_backend/s3_download.go98
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go8
-rw-r--r--weed/storage/backend/s3_backend/s3_upload.go114
-rw-r--r--weed/storage/disk_location.go173
-rw-r--r--weed/storage/disk_location_ec.go14
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go198
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go30
-rw-r--r--weed/storage/erasure_coding/ec_test.go16
-rw-r--r--weed/storage/erasure_coding/ec_volume.go15
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go10
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go16
-rw-r--r--weed/storage/needle/crc.go14
-rw-r--r--weed/storage/needle/needle.go61
-rw-r--r--weed/storage/needle/needle_parse_multipart.go109
-rw-r--r--weed/storage/needle/needle_parse_upload.go166
-rw-r--r--weed/storage/needle/needle_read_write.go14
-rw-r--r--weed/storage/needle/volume_ttl.go3
-rw-r--r--weed/storage/needle_map/btree_map.go53
-rw-r--r--weed/storage/needle_map/compact_map_test.go9
-rw-r--r--weed/storage/needle_map/memdb.go119
-rw-r--r--weed/storage/needle_map/memdb_test.go23
-rw-r--r--weed/storage/needle_map_leveldb.go13
-rw-r--r--weed/storage/needle_map_memory.go19
-rw-r--r--weed/storage/needle_map_metric_test.go7
-rw-r--r--weed/storage/needle_map_sorted_file.go105
-rw-r--r--weed/storage/store.go63
-rw-r--r--weed/storage/store_ec.go63
-rw-r--r--weed/storage/store_ec_delete.go20
-rw-r--r--weed/storage/store_vacuum.go3
-rw-r--r--weed/storage/super_block/replica_placement.go (renamed from weed/storage/replica_placement.go)2
-rw-r--r--weed/storage/super_block/replica_placement_test.go (renamed from weed/storage/replica_placement_test.go)2
-rw-r--r--weed/storage/super_block/super_block.go69
-rw-r--r--weed/storage/super_block/super_block_read.go.go44
-rw-r--r--weed/storage/super_block/super_block_test.go (renamed from weed/storage/volume_super_block_test.go)4
-rw-r--r--weed/storage/volume.go90
-rw-r--r--weed/storage/volume_backup.go20
-rw-r--r--weed/storage/volume_checking.go2
-rw-r--r--weed/storage/volume_create.go7
-rw-r--r--weed/storage/volume_create_linux.go7
-rw-r--r--weed/storage/volume_create_windows.go13
-rw-r--r--weed/storage/volume_info.go83
-rw-r--r--weed/storage/volume_loading.go172
-rw-r--r--weed/storage/volume_read_write.go35
-rw-r--r--weed/storage/volume_super_block.go114
-rw-r--r--weed/storage/volume_tier.go50
-rw-r--r--weed/storage/volume_vacuum.go252
-rw-r--r--weed/storage/volume_vacuum_test.go7
51 files changed, 1963 insertions, 893 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index 3c297f20b..6941ca5a1 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -2,18 +2,134 @@ package backend
import (
"io"
+ "os"
+ "strings"
"time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/spf13/viper"
)
-type DataStorageBackend interface {
+type BackendStorageFile interface {
io.ReaderAt
io.WriterAt
Truncate(off int64) error
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
- String() string
+ Name() string
+}
+
+type BackendStorage interface {
+ ToProperties() map[string]string
+ NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) BackendStorageFile
+ CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error)
+ DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error)
+ DeleteFile(key string) (err error)
+}
+
+type StringProperties interface {
+ GetString(key string) string
+}
+type StorageType string
+type BackendStorageFactory interface {
+ StorageType() StorageType
+ BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error)
}
var (
- StorageBackends []DataStorageBackend
+ BackendStorageFactories = make(map[StorageType]BackendStorageFactory)
+ BackendStorages = make(map[string]BackendStorage)
)
+
+// used by master to load remote storage configurations
+func LoadConfiguration(config *viper.Viper) {
+
+ StorageBackendPrefix := "storage.backend"
+
+ for backendTypeName := range config.GetStringMap(StorageBackendPrefix) {
+ backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)]
+ if !found {
+ glog.Fatalf("backend storage type %s not found", backendTypeName)
+ }
+ for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) {
+ if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
+ continue
+ }
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
+ StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
+ if buildErr != nil {
+ glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId)
+ }
+ BackendStorages[backendTypeName+"."+backendStorageId] = backendStorage
+ if backendStorageId == "default" {
+ BackendStorages[backendTypeName] = backendStorage
+ }
+ }
+ }
+
+}
+
+// used by volume server to receive remote storage configurations from master
+func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
+
+ for _, storageBackend := range storageBackends {
+ backendStorageFactory, found := BackendStorageFactories[StorageType(storageBackend.Type)]
+ if !found {
+ glog.Warningf("storage type %s not found", storageBackend.Type)
+ continue
+ }
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
+ if buildErr != nil {
+ glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)
+ }
+ BackendStorages[storageBackend.Type+"."+storageBackend.Id] = backendStorage
+ if storageBackend.Id == "default" {
+ BackendStorages[storageBackend.Type] = backendStorage
+ }
+ }
+}
+
+type Properties struct {
+ m map[string]string
+}
+
+func newProperties(m map[string]string) *Properties {
+ return &Properties{m: m}
+}
+
+func (p *Properties) GetString(key string) string {
+ if v, found := p.m[key]; found {
+ return v
+ }
+ return ""
+}
+
+func ToPbStorageBackends() (backends []*master_pb.StorageBackend) {
+ for sName, s := range BackendStorages {
+ sType, sId := BackendNameToTypeId(sName)
+ if sType == "" {
+ continue
+ }
+ backends = append(backends, &master_pb.StorageBackend{
+ Type: sType,
+ Id: sId,
+ Properties: s.ToProperties(),
+ })
+ }
+ return
+}
+
+func BackendNameToTypeId(backendName string) (backendType, backendId string) {
+ parts := strings.Split(backendName, ".")
+ if len(parts) == 1 {
+ return backendName, "default"
+ }
+ if len(parts) != 2 {
+ return
+ }
+
+ backendType, backendId = parts[0], parts[1]
+ return
+}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 7f2b39d15..c4b3caffb 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -6,7 +6,7 @@ import (
)
var (
- _ DataStorageBackend = &DiskFile{}
+ _ BackendStorageFile = &DiskFile{}
)
type DiskFile struct {
@@ -45,6 +45,6 @@ func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
return 0, time.Time{}, err
}
-func (df *DiskFile) String() string {
+func (df *DiskFile) Name() string {
return df.fullFilePath
}
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
index bac105022..03e7308d0 100644
--- a/weed/storage/backend/memory_map/memory_map_backend.go
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -8,7 +8,7 @@ import (
)
var (
- _ backend.DataStorageBackend = &MemoryMappedFile{}
+ _ backend.BackendStorageFile = &MemoryMappedFile{}
)
type MemoryMappedFile struct {
@@ -55,6 +55,6 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
return 0, time.Time{}, err
}
-func (mmf *MemoryMappedFile) String() string {
+func (mmf *MemoryMappedFile) Name() string {
return mmf.mm.File.Name()
}
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 0ff7eca21..8d71861c2 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -2,119 +2,176 @@ package s3_backend
import (
"fmt"
+ "io"
+ "os"
"strings"
"time"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/google/uuid"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-var (
- _ backend.DataStorageBackend = &S3Backend{}
)
func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+ backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
}
-type S3Backend struct {
- conn s3iface.S3API
- region string
- bucket string
- dir string
- vid needle.VolumeId
- key string
+type S3BackendFactory struct {
}
-func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
- bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
- Range: &bytesRange,
- })
+func (factory *S3BackendFactory) StorageType() backend.StorageType {
+ return backend.StorageType("s3")
+}
+func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, configPrefix, id)
+}
- if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+type S3BackendStorage struct {
+ id string
+ aws_access_key_id string
+ aws_secret_access_key string
+ region string
+ bucket string
+ conn s3iface.S3API
+}
+
+func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) {
+ s = &S3BackendStorage{}
+ s.id = id
+ s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id")
+ s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key")
+ s.region = configuration.GetString(configPrefix + "region")
+ s.bucket = configuration.GetString(configPrefix + "bucket")
+ s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region)
+
+ glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket)
+ return
+}
+
+func (s *S3BackendStorage) ToProperties() map[string]string {
+ m := make(map[string]string)
+ m["aws_access_key_id"] = s.aws_access_key_id
+ m["aws_secret_access_key"] = s.aws_secret_access_key
+ m["region"] = s.region
+ m["bucket"] = s.bucket
+ return m
+}
+
+func (s *S3BackendStorage) NewStorageFile(key string, tierInfo *volume_server_pb.VolumeInfo) backend.BackendStorageFile {
+ if strings.HasPrefix(key, "/") {
+ key = key[1:]
}
- defer getObjectOutput.Body.Close()
- return getObjectOutput.Body.Read(p)
+ f := &S3BackendStorageFile{
+ backendStorage: s,
+ key: key,
+ tierInfo: tierInfo,
+ }
+ return f
}
-func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
- panic("implement me")
+func (s *S3BackendStorage) CopyFile(f *os.File, attributes map[string]string, fn func(progressed int64, percentage float32) error) (key string, size int64, err error) {
+ randomUuid, _ := uuid.NewRandom()
+ key = randomUuid.String()
+
+ glog.V(1).Infof("copying dat file of %s to remote s3.%s as %s", f.Name(), s.id, key)
+
+ size, err = uploadToS3(s.conn, f.Name(), s.bucket, key, attributes, fn)
+
+ return
}
-func (s3backend S3Backend) Truncate(off int64) error {
- panic("implement me")
+func (s *S3BackendStorage) DownloadFile(fileName string, key string, fn func(progressed int64, percentage float32) error) (size int64, err error) {
+
+ glog.V(1).Infof("download dat file of %s from remote s3.%s as %s", fileName, s.id, key)
+
+ size, err = downloadFromS3(s.conn, fileName, s.bucket, key, fn)
+
+ return
}
-func (s3backend S3Backend) Close() error {
- return nil
+func (s *S3BackendStorage) DeleteFile(key string) (err error) {
+
+ glog.V(1).Infof("delete dat file %s from remote", key)
+
+ err = deleteFromS3(s.conn, s.bucket, key)
+
+ return
}
-func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+type S3BackendStorageFile struct {
+ backendStorage *S3BackendStorage
+ key string
+ tierInfo *volume_server_pb.VolumeInfo
+}
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
+
+ bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
+
+ // glog.V(0).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+
+ getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
+ Range: &bytesRange,
})
- if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ if getObjectErr != nil {
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
+ }
+ defer getObjectOutput.Body.Close()
+
+ glog.V(4).Infof("read %s %s", s3backendStorageFile.key, bytesRange)
+ glog.V(4).Infof("content range: %s, contentLength: %d", *getObjectOutput.ContentRange, *getObjectOutput.ContentLength)
+
+ for {
+ if n, err = getObjectOutput.Body.Read(p); err == nil && n < len(p) {
+ p = p[n:]
+ } else {
+ break
+ }
}
- datSize = int64(*headObjectOutput.ContentLength)
- modTime = *headObjectOutput.LastModified
+ if err == io.EOF {
+ err = nil
+ }
return
}
-func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetName() string {
- return "s3"
+func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
+ panic("not implemented")
}
-func (s3backend *S3Backend) GetSinkToDirectory() string {
- return s3backend.dir
+func (s3backendStorageFile S3BackendStorageFile) Close() error {
+ return nil
}
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
+func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- configuration.GetString("directory"),
- vid,
- )
-}
-
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket, dir string,
- vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.dir = dir
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
+ files := s3backendStorageFile.tierInfo.GetFiles()
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s/%d.dat", dir, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
+ if len(files) == 0 {
+ err = fmt.Errorf("remote file info not found")
+ return
}
- return err
+ datSize = int64(files[0].FileSize)
+ modTime = time.Unix(int64(files[0].ModifiedTime), 0)
+
+ return
+}
+
+func (s3backendStorageFile S3BackendStorageFile) Name() string {
+ return s3backendStorageFile.key
}
diff --git a/weed/storage/backend/s3_backend/s3_download.go b/weed/storage/backend/s3_backend/s3_download.go
new file mode 100644
index 000000000..dbc28446a
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_download.go
@@ -0,0 +1,98 @@
+package s3_backend
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func downloadFromS3(sess s3iface.S3API, destFileName string, sourceBucket string, sourceKey string,
+ fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
+
+ fileSize, err = getFileSize(sess, sourceBucket, sourceKey)
+ if err != nil {
+ return
+ }
+
+ //open the file
+ f, err := os.OpenFile(destFileName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return 0, fmt.Errorf("failed to open file %q, %v", destFileName, err)
+ }
+ defer f.Close()
+
+ // Create a downloader with the session and custom options
+ downloader := s3manager.NewDownloaderWithClient(sess, func(u *s3manager.Downloader) {
+ u.PartSize = int64(64 * 1024 * 1024)
+ u.Concurrency = 5
+ })
+
+ fileWriter := &s3DownloadProgressedWriter{
+ fp: f,
+ size: fileSize,
+ written: 0,
+ fn: fn,
+ }
+
+ // Download the file from S3.
+ fileSize, err = downloader.Download(fileWriter, &s3.GetObjectInput{
+ Bucket: aws.String(sourceBucket),
+ Key: aws.String(sourceKey),
+ })
+ if err != nil {
+ return fileSize, fmt.Errorf("failed to download file %s: %v", destFileName, err)
+ }
+
+ glog.V(1).Infof("downloaded file %s\n", destFileName)
+
+ return
+}
+
+// adapted from https://github.com/aws/aws-sdk-go/pull/1868
+// and https://petersouter.xyz/s3-download-progress-bar-in-golang/
+type s3DownloadProgressedWriter struct {
+ fp *os.File
+ size int64
+ written int64
+ fn func(progressed int64, percentage float32) error
+}
+
+func (w *s3DownloadProgressedWriter) WriteAt(p []byte, off int64) (int, error) {
+ n, err := w.fp.WriteAt(p, off)
+ if err != nil {
+ return n, err
+ }
+
+ // Got the length have read( or means has uploaded), and you can construct your message
+ atomic.AddInt64(&w.written, int64(n))
+
+ if w.fn != nil {
+ written := w.written
+ if err := w.fn(written, float32(written*100)/float32(w.size)); err != nil {
+ return n, err
+ }
+ }
+
+ return n, err
+}
+
+func getFileSize(svc s3iface.S3API, bucket string, key string) (filesize int64, error error) {
+ params := &s3.HeadObjectInput{
+ Bucket: aws.String(bucket),
+ Key: aws.String(key),
+ }
+
+ resp, err := svc.HeadObject(params)
+ if err != nil {
+ return 0, err
+ }
+
+ return *resp.ContentLength, nil
+}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
index cd7b7ad47..5fdbcb66b 100644
--- a/weed/storage/backend/s3_backend/s3_sessions.go
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -52,3 +52,11 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S
return t, nil
}
+
+func deleteFromS3(sess s3iface.S3API, sourceBucket string, sourceKey string) (err error) {
+ _, err = sess.DeleteObject(&s3.DeleteObjectInput{
+ Bucket: aws.String(sourceBucket),
+ Key: aws.String(sourceKey),
+ })
+ return err
+}
diff --git a/weed/storage/backend/s3_backend/s3_upload.go b/weed/storage/backend/s3_backend/s3_upload.go
new file mode 100644
index 000000000..500a85590
--- /dev/null
+++ b/weed/storage/backend/s3_backend/s3_upload.go
@@ -0,0 +1,114 @@
+package s3_backend
+
+import (
+ "fmt"
+ "os"
+ "sync/atomic"
+
+ "github.com/aws/aws-sdk-go/aws"
+ "github.com/aws/aws-sdk-go/service/s3/s3iface"
+ "github.com/aws/aws-sdk-go/service/s3/s3manager"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func uploadToS3(sess s3iface.S3API, filename string, destBucket string, destKey string,
+ attributes map[string]string,
+ fn func(progressed int64, percentage float32) error) (fileSize int64, err error) {
+
+ //open the file
+ f, err := os.Open(filename)
+ if err != nil {
+ return 0, fmt.Errorf("failed to open file %q, %v", filename, err)
+ }
+ defer f.Close()
+
+ info, err := f.Stat()
+ if err != nil {
+ return 0, fmt.Errorf("failed to stat file %q, %v", filename, err)
+ }
+
+ fileSize = info.Size()
+
+ partSize := int64(64 * 1024 * 1024) // The minimum/default allowed part size is 5MB
+ for partSize*1000 < fileSize {
+ partSize *= 4
+ }
+
+ // Create an uploader with the session and custom options
+ uploader := s3manager.NewUploaderWithClient(sess, func(u *s3manager.Uploader) {
+ u.PartSize = partSize
+ u.Concurrency = 5
+ })
+
+ fileReader := &s3UploadProgressedReader{
+ fp: f,
+ size: fileSize,
+ read: -fileSize,
+ fn: fn,
+ }
+
+ // process tagging
+ tags := ""
+ for k, v := range attributes {
+ if len(tags) > 0 {
+ tags = tags + "&"
+ }
+ tags = tags + k + "=" + v
+ }
+
+ // Upload the file to S3.
+ var result *s3manager.UploadOutput
+ result, err = uploader.Upload(&s3manager.UploadInput{
+ Bucket: aws.String(destBucket),
+ Key: aws.String(destKey),
+ Body: fileReader,
+ ACL: aws.String("private"),
+ ServerSideEncryption: aws.String("AES256"),
+ StorageClass: aws.String("STANDARD_IA"),
+ Tagging: aws.String(tags),
+ })
+
+ //in case it fails to upload
+ if err != nil {
+ return 0, fmt.Errorf("failed to upload file %s: %v", filename, err)
+ }
+ glog.V(1).Infof("file %s uploaded to %s\n", filename, result.Location)
+
+ return
+}
+
+// adapted from https://github.com/aws/aws-sdk-go/pull/1868
+type s3UploadProgressedReader struct {
+ fp *os.File
+ size int64
+ read int64
+ fn func(progressed int64, percentage float32) error
+}
+
+func (r *s3UploadProgressedReader) Read(p []byte) (int, error) {
+ return r.fp.Read(p)
+}
+
+func (r *s3UploadProgressedReader) ReadAt(p []byte, off int64) (int, error) {
+ n, err := r.fp.ReadAt(p, off)
+ if err != nil {
+ return n, err
+ }
+
+ // Got the length have read( or means has uploaded), and you can construct your message
+ atomic.AddInt64(&r.read, int64(n))
+
+ if r.fn != nil {
+ read := r.read
+ if err := r.fn(read, float32(read*100)/float32(r.size)); err != nil {
+ return n, err
+ }
+ }
+
+ return n, err
+}
+
+func (r *s3UploadProgressedReader) Seek(offset int64, whence int) (int64, error) {
+ return r.fp.Seek(offset, whence)
+}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index c7faa57a6..f15303282 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -1,13 +1,12 @@
package storage
import (
+ "fmt"
"io/ioutil"
"os"
"strings"
"sync"
- "fmt"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -17,7 +16,7 @@ type DiskLocation struct {
Directory string
MaxVolumeCount int
volumes map[needle.VolumeId]*Volume
- sync.RWMutex
+ volumesLock sync.RWMutex
// erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
@@ -33,8 +32,8 @@ func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) {
name := dir.Name()
- if !dir.IsDir() && strings.HasSuffix(name, ".dat") {
- base := name[:len(name)-len(".dat")]
+ if !dir.IsDir() && strings.HasSuffix(name, ".idx") {
+ base := name[:len(name)-len(".idx")]
collection, volumeId, err := parseCollectionVolumeId(base)
return volumeId, collection, err
}
@@ -51,30 +50,39 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
return collection, vol, err
}
-func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) {
+func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) bool {
name := fileInfo.Name()
- if !fileInfo.IsDir() && strings.HasSuffix(name, ".dat") {
+ if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
vid, collection, err := l.volumeIdFromPath(fileInfo)
- if err == nil {
- l.RLock()
- _, found := l.volumes[vid]
- l.RUnlock()
- if !found {
- if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil {
- l.Lock()
- l.volumes[vid] = v
- l.Unlock()
- size, _, _ := v.FileStat()
- glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
- // println("volume", vid, "last append at", v.lastAppendAtNs)
- } else {
- glog.V(0).Infof("new volume %s error %s", name, e)
- }
+ if err != nil {
+ glog.Warningf("get volume id failed, %s, err : %s", name, err)
+ return false
+ }
- }
+ // void loading one volume more than once
+ l.volumesLock.RLock()
+ _, found := l.volumes[vid]
+ l.volumesLock.RUnlock()
+ if found {
+ glog.V(1).Infof("loaded volume, %v", vid)
+ return true
+ }
+
+ v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0)
+ if e != nil {
+ glog.V(0).Infof("new volume %s error %s", name, e)
+ return false
}
+
+ l.volumesLock.Lock()
+ l.volumes[vid] = v
+ l.volumesLock.Unlock()
+ size, _, _ := v.FileStat()
+ glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
+ l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ return true
}
+ return false
}
func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
@@ -95,7 +103,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
go func() {
defer wg.Done()
for dir := range task_queue {
- l.loadExistingVolume(dir, needleMapKind)
+ _ = l.loadExistingVolume(dir, needleMapKind)
}
}()
}
@@ -115,29 +123,46 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e error) {
- l.Lock()
- for k, v := range l.volumes {
- if v.Collection == collection {
- e = l.deleteVolumeById(k)
- if e != nil {
- l.Unlock()
- return
- }
- }
- }
- l.Unlock()
+ l.volumesLock.Lock()
+ delVolsMap := l.unmountVolumeByCollection(collection)
+ l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
- for k, v := range l.ecVolumes {
- if v.Collection == collection {
- e = l.deleteEcVolumeById(k)
- if e != nil {
- l.ecVolumesLock.Unlock()
- return
+ delEcVolsMap := l.unmountEcVolumeByCollection(collection)
+ l.ecVolumesLock.Unlock()
+
+ errChain := make(chan error, 2)
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ for _, v := range delVolsMap {
+ if err := v.Destroy(); err != nil {
+ errChain <- err
}
}
+ wg.Done()
+ }()
+
+ go func() {
+ for _, v := range delEcVolsMap {
+ v.Destroy()
+ }
+ wg.Done()
+ }()
+
+ go func() {
+ wg.Wait()
+ close(errChain)
+ }()
+
+ errBuilder := strings.Builder{}
+ for err := range errChain {
+ errBuilder.WriteString(err.Error())
+ errBuilder.WriteString("; ")
+ }
+ if errBuilder.Len() > 0 {
+ e = fmt.Errorf(errBuilder.String())
}
- l.ecVolumesLock.Unlock()
return
}
@@ -156,22 +181,15 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
}
func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
- if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
- for _, fileInfo := range fileInfos {
- volId, _, err := l.volumeIdFromPath(fileInfo)
- if vid == volId && err == nil {
- l.loadExistingVolume(fileInfo, needleMapKind)
- return true
- }
- }
+ if fileInfo, found := l.LocateVolume(vid); found {
+ return l.loadExistingVolume(fileInfo, needleMapKind)
}
-
return false
}
func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
- l.Lock()
- defer l.Unlock()
+ l.volumesLock.Lock()
+ defer l.volumesLock.Unlock()
_, ok := l.volumes[vid]
if !ok {
@@ -181,8 +199,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
}
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
- l.Lock()
- defer l.Unlock()
+ l.volumesLock.Lock()
+ defer l.volumesLock.Unlock()
v, ok := l.volumes[vid]
if !ok {
@@ -193,34 +211,48 @@ func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
return nil
}
+func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[needle.VolumeId]*Volume {
+ deltaVols := make(map[needle.VolumeId]*Volume, 0)
+ for k, v := range l.volumes {
+ if v.Collection == collectionName && !v.isCompacting {
+ deltaVols[k] = v
+ }
+ }
+
+ for k := range deltaVols {
+ delete(l.volumes, k)
+ }
+ return deltaVols
+}
+
func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
- l.Lock()
- defer l.Unlock()
+ l.volumesLock.Lock()
+ defer l.volumesLock.Unlock()
l.volumes[vid] = volume
}
func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
- l.RLock()
- defer l.RUnlock()
+ l.volumesLock.RLock()
+ defer l.volumesLock.RUnlock()
v, ok := l.volumes[vid]
return v, ok
}
func (l *DiskLocation) VolumesLen() int {
- l.RLock()
- defer l.RUnlock()
+ l.volumesLock.RLock()
+ defer l.volumesLock.RUnlock()
return len(l.volumes)
}
func (l *DiskLocation) Close() {
- l.Lock()
+ l.volumesLock.Lock()
for _, v := range l.volumes {
v.Close()
}
- l.Unlock()
+ l.volumesLock.Unlock()
l.ecVolumesLock.Lock()
for _, ecVolume := range l.ecVolumes {
@@ -230,3 +262,16 @@ func (l *DiskLocation) Close() {
return
}
+
+func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
+ if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
+ for _, fileInfo := range fileInfos {
+ volId, _, err := l.volumeIdFromPath(fileInfo)
+ if vid == volId && err == nil {
+ return fileInfo, true
+ }
+ }
+ }
+
+ return nil, false
+}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index ba0824c6d..f6c44e966 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -169,3 +169,17 @@ func (l *DiskLocation) deleteEcVolumeById(vid needle.VolumeId) (e error) {
delete(l.ecVolumes, vid)
return
}
+
+func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[needle.VolumeId]*erasure_coding.EcVolume {
+ deltaVols := make(map[needle.VolumeId]*erasure_coding.EcVolume, 0)
+ for k, v := range l.ecVolumes {
+ if v.Collection == collectionName {
+ deltaVols[k] = v
+ }
+ }
+
+ for k, _ := range deltaVols {
+ delete(l.ecVolumes, k)
+ }
+ return deltaVols
+}
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
new file mode 100644
index 000000000..ae77cee3f
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -0,0 +1,198 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "io"
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// write .idx file from .ecx and .ecj files
+func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
+
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ idxFile, openErr := os.OpenFile(baseFileName+".idx", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open %s.idx: %v", baseFileName, openErr)
+ }
+ defer idxFile.Close()
+
+ io.Copy(idxFile, ecxFile)
+
+ err = iterateEcjFile(baseFileName, func(key types.NeedleId) error {
+
+ bytes := needle_map.ToBytes(key, types.Offset{}, types.TombstoneFileSize)
+ idxFile.Write(bytes)
+
+ return nil
+ })
+
+ return err
+}
+
+// FindDatFileSize calculate .dat file size from max offset entry
+// there may be extra deletions after that entry
+// but they are deletions anyway
+func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+
+ version, err := readEcVolumeVersion(baseFileName)
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ }
+
+ err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+
+ if size == types.TombstoneFileSize {
+ return nil
+ }
+
+ entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ if datSize < entryStopOffset {
+ datSize = entryStopOffset
+ }
+
+ return nil
+ })
+
+ return
+}
+
+func readEcVolumeVersion(baseFileName string) (version needle.Version, err error) {
+
+ // find volume version
+ datFile, err := os.OpenFile(baseFileName+".ec00", os.O_RDONLY, 0644)
+ if err != nil {
+ return 0, fmt.Errorf("open ec volume %s superblock: %v", baseFileName, err)
+ }
+ datBackend := backend.NewDiskFile(datFile)
+
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
+ datBackend.Close()
+ if err != nil {
+ return 0, fmt.Errorf("read ec volume %s superblock: %v", baseFileName, err)
+ }
+
+ return superBlock.Version, nil
+
+}
+
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+ ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecxFile.Close()
+
+ buf := make([]byte, types.NeedleMapEntrySize)
+ for {
+ n, err := ecxFile.Read(buf)
+ if n != types.NeedleMapEntrySize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ key, offset, size := idx.IdxFileEntry(buf)
+ if processNeedleFn != nil {
+ err = processNeedleFn(key, offset, size)
+ }
+ if err != nil {
+ if err != io.EOF {
+ return err
+ }
+ return nil
+ }
+ }
+
+}
+
+func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
+ ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ }
+ defer ecjFile.Close()
+
+ buf := make([]byte, types.NeedleIdSize)
+ for {
+ n, err := ecjFile.Read(buf)
+ if n != types.NeedleIdSize {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ if processNeedleFn != nil {
+ err = processNeedleFn(types.BytesToNeedleId(buf))
+ }
+ if err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return err
+ }
+ }
+
+}
+
+// WriteDatFile generates .dat from from .ec00 ~ .ec09 files
+func WriteDatFile(baseFileName string, datFileSize int64) error {
+
+ datFile, openErr := os.OpenFile(baseFileName+".dat", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if openErr != nil {
+ return fmt.Errorf("cannot write volume %s.dat: %v", baseFileName, openErr)
+ }
+ defer datFile.Close()
+
+ inputFiles := make([]*os.File, DataShardsCount)
+
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ shardFileName := baseFileName + ToExt(shardId)
+ inputFiles[shardId], openErr = os.OpenFile(shardFileName, os.O_RDONLY, 0)
+ if openErr != nil {
+ return openErr
+ }
+ defer inputFiles[shardId].Close()
+ }
+
+ for datFileSize >= DataShardsCount*ErasureCodingLargeBlockSize {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ w, err := io.CopyN(datFile, inputFiles[shardId], ErasureCodingLargeBlockSize)
+ if w != ErasureCodingLargeBlockSize {
+ return fmt.Errorf("copy %s large block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= ErasureCodingLargeBlockSize
+ }
+ }
+
+ for datFileSize > 0 {
+ for shardId := 0; shardId < DataShardsCount; shardId++ {
+ toRead := min(datFileSize, ErasureCodingSmallBlockSize)
+ w, err := io.CopyN(datFile, inputFiles[shardId], toRead)
+ if w != toRead {
+ return fmt.Errorf("copy %s small block %d: %v", baseFileName, shardId, err)
+ }
+ datFileSize -= toRead
+ }
+ }
+
+ return nil
+}
+
+func min(x, y int64) int64 {
+ if x > y {
+ return y
+ }
+ return x
+}
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index 97010a1ed..97c3ccbd9 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -5,12 +5,13 @@ import (
"io"
"os"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -21,35 +22,38 @@ const (
ErasureCodingSmallBlockSize = 1024 * 1024 // 1MB
)
-// WriteSortedEcxFile generates .ecx file from existing .idx file
+// WriteSortedFileFromIdx generates .ecx file from existing .idx file
// all keys are sorted in ascending order
-func WriteSortedEcxFile(baseFileName string) (e error) {
+func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ if nm != nil {
+ defer nm.Close()
+ }
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
- ecxFile, err := os.OpenFile(baseFileName+".ecx", os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
+ ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("failed to open ecx file: %v", err)
}
defer ecxFile.Close()
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
bytes := value.ToBytes()
_, writeErr := ecxFile.Write(bytes)
return writeErr
})
if err != nil {
- return fmt.Errorf("failed to visit ecx file: %v", err)
+ return fmt.Errorf("failed to visit idx file: %v", err)
}
return nil
}
-// WriteEcFiles generates .ec01 ~ .ec14 files
+// WriteEcFiles generates .ec00 ~ .ec13 files
func WriteEcFiles(baseFileName string) error {
return generateEcFiles(baseFileName, 256*1024, ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
}
@@ -195,7 +199,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
@@ -232,7 +236,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
}
@@ -280,14 +284,14 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
-func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
+func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
if err != nil {
return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
}
defer indexFile.Close()
- cm := needle_map.NewCompactMap()
+ cm := needle_map.NewMemDb()
err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 57df09525..92b83cdc8 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -7,9 +7,10 @@ import (
"os"
"testing"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -26,14 +27,14 @@ func TestEncodingDecoding(t *testing.T) {
t.Logf("generateEcFiles: %v", err)
}
- err = WriteSortedEcxFile(baseFileName)
+ err = WriteSortedFileFromIdx(baseFileName, ".ecx")
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
err = validateFiles(baseFileName)
if err != nil {
- t.Logf("WriteSortedEcxFile: %v", err)
+ t.Logf("WriteSortedFileFromIdx: %v", err)
}
removeGeneratedFiles(baseFileName)
@@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) {
}
func validateFiles(baseFileName string) error {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ defer nm.Close()
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
@@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error {
ecFiles, err := openEcFiles(baseFileName, true)
defer closeEcFiles(ecFiles)
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size)
})
if err != nil {
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index bcae164ca..3d9aa2cff 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -9,7 +9,9 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -56,6 +58,14 @@ func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolu
return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
}
+ // read volume info
+ ev.Version = needle.Version3
+ if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ ev.Version = needle.Version(volumeInfo.Version)
+ } else {
+ pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ }
+
ev.ShardLocations = make(map[ShardId][]string)
return
@@ -126,6 +136,7 @@ func (ev *EcVolume) Destroy() {
}
os.Remove(ev.FileName() + ".ecx")
os.Remove(ev.FileName() + ".ecj")
+ os.Remove(ev.FileName() + ".vif")
}
func (ev *EcVolume) FileName() string {
@@ -186,10 +197,10 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
}
func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
- return searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, nil)
+ return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func searchNeedleFromEcx(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 04102ec9e..822a9e923 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -10,15 +10,15 @@ import (
)
var (
- markNeedleDeleted = func(file *os.File, offset int64) error {
+ MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
util.Uint32toBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
- return fmt.Errorf("ecx write error: %v", err)
+ return fmt.Errorf("sorted needle write error: %v", err)
}
if n != types.SizeSize {
- return fmt.Errorf("ecx written %d bytes, expecting %d", n, types.SizeSize)
+ return fmt.Errorf("sorted needle written %d bytes, expecting %d", n, types.SizeSize)
}
return nil
}
@@ -26,7 +26,7 @@ var (
func (ev *EcVolume) DeleteNeedleFromEcx(needleId types.NeedleId) (err error) {
- _, _, err = searchNeedleFromEcx(ev.ecxFile, ev.ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil {
if err == NotFoundError {
@@ -81,7 +81,7 @@ func RebuildEcxFile(baseFileName string) error {
needleId := types.BytesToNeedleId(buf)
- _, _, err = searchNeedleFromEcx(ecxFile, ecxFileSize, needleId, markNeedleDeleted)
+ _, _, err = SearchNeedleFromSortedIndex(ecxFile, ecxFileSize, needleId, MarkNeedleDeleted)
if err != nil && err != NotFoundError {
ecxFile.Close()
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index c9e85c662..8ff65bb0f 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -81,6 +81,15 @@ func (b ShardBits) ShardIds() (ret []ShardId) {
return
}
+func (b ShardBits) ToUint32Slice() (ret []uint32) {
+ for i := uint32(0); i < TotalShardsCount; i++ {
+ if b.HasShardId(ShardId(i)) {
+ ret = append(ret, i)
+ }
+ }
+ return
+}
+
func (b ShardBits) ShardIdCount() (count int) {
for count = 0; b > 0; count++ {
b &= b - 1
@@ -95,3 +104,10 @@ func (b ShardBits) Minus(other ShardBits) ShardBits {
func (b ShardBits) Plus(other ShardBits) ShardBits {
return b | other
}
+
+func (b ShardBits) MinusParityShards() ShardBits {
+ for i := DataShardsCount; i < TotalShardsCount; i++ {
+ b = b.RemoveShardId(ShardId(i))
+ }
+ return b
+}
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 00ea1db69..6fd910bb7 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -1,11 +1,11 @@
package needle
import (
- "crypto/md5"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/klauspost/crc32"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var table = crc32.MakeTable(crc32.Castagnoli)
@@ -29,13 +29,3 @@ func (n *Needle) Etag() string {
util.Uint32toBytes(bits, uint32(n.Checksum))
return fmt.Sprintf("%x", bits)
}
-
-func (n *Needle) MD5() string {
-
- hash := md5.New()
-
- hash.Write(n.Data)
-
- return fmt.Sprintf("%x", hash.Sum(nil))
-
-}
diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go
index 2f03ba87b..d3969e868 100644
--- a/weed/storage/needle/needle.go
+++ b/weed/storage/needle/needle.go
@@ -8,8 +8,6 @@ import (
"strings"
"time"
- "io/ioutil"
-
"github.com/chrislusf/seaweedfs/weed/images"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -50,53 +48,28 @@ func (n *Needle) String() (str string) {
return
}
-func ParseUpload(r *http.Request) (
- fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int,
- modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
- pairMap = make(map[string]string)
- for k, v := range r.Header {
- if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
- pairMap[k] = v[0]
- }
- }
-
- if r.Method == "POST" {
- fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r)
- } else {
- isGzipped = false
- mimeType = r.Header.Get("Content-Type")
- fileName = ""
- data, e = ioutil.ReadAll(r.Body)
- originalDataSize = len(data)
- }
- if e != nil {
- return
- }
-
- modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
- ttl, _ = ReadTTL(r.FormValue("ttl"))
-
- return
-}
-func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) {
- var pairMap map[string]string
- fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
+func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, e error) {
n = new(Needle)
- fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
+ pu, e := ParseUpload(r, sizeLimit)
if e != nil {
return
}
- if len(fname) < 256 {
- n.Name = []byte(fname)
+ n.Data = pu.Data
+ originalSize = pu.OriginalDataSize
+ n.LastModified = pu.ModifiedTime
+ n.Ttl = pu.Ttl
+
+ if len(pu.FileName) < 256 {
+ n.Name = []byte(pu.FileName)
n.SetHasName()
}
- if len(mimeType) < 256 {
- n.Mime = []byte(mimeType)
+ if len(pu.MimeType) < 256 {
+ n.Mime = []byte(pu.MimeType)
n.SetHasMime()
}
- if len(pairMap) != 0 {
+ if len(pu.PairMap) != 0 {
trimmedPairMap := make(map[string]string)
- for k, v := range pairMap {
+ for k, v := range pu.PairMap {
trimmedPairMap[k[len(PairNamePrefix):]] = v
}
@@ -107,7 +80,7 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle
n.SetHasPairs()
}
}
- if isGzipped {
+ if pu.IsGzipped {
n.SetGzipped()
}
if n.LastModified == 0 {
@@ -118,13 +91,13 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle
n.SetHasTtl()
}
- if isChunkedFile {
+ if pu.IsChunkedFile {
n.SetIsChunkManifest()
}
if fixJpgOrientation {
- loweredName := strings.ToLower(fname)
- if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
+ loweredName := strings.ToLower(pu.FileName)
+ if pu.MimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
n.Data = images.FixJpgOrientation(n.Data)
}
}
diff --git a/weed/storage/needle/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go
deleted file mode 100644
index 8be1a1da4..000000000
--- a/weed/storage/needle/needle_parse_multipart.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package needle
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-
- "io"
- "io/ioutil"
- "mime"
- "net/http"
- "path"
- "strconv"
- "strings"
-)
-
-func parseMultipart(r *http.Request) (
- fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) {
- defer func() {
- if e != nil && r.Body != nil {
- io.Copy(ioutil.Discard, r.Body)
- r.Body.Close()
- }
- }()
- form, fe := r.MultipartReader()
- if fe != nil {
- glog.V(0).Infoln("MultipartReader [ERROR]", fe)
- e = fe
- return
- }
-
- //first multi-part item
- part, fe := form.NextPart()
- if fe != nil {
- glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
- e = fe
- return
- }
-
- fileName = part.FileName()
- if fileName != "" {
- fileName = path.Base(fileName)
- }
-
- data, e = ioutil.ReadAll(part)
- if e != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", e)
- return
- }
-
- //if the filename is empty string, do a search on the other multi-part items
- for fileName == "" {
- part2, fe := form.NextPart()
- if fe != nil {
- break // no more or on error, just safely break
- }
-
- fName := part2.FileName()
-
- //found the first <file type> multi-part has filename
- if fName != "" {
- data2, fe2 := ioutil.ReadAll(part2)
- if fe2 != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", fe2)
- e = fe2
- return
- }
-
- //update
- data = data2
- fileName = path.Base(fName)
- break
- }
- }
-
- originalDataSize = len(data)
-
- isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
-
- if !isChunkedFile {
-
- dotIndex := strings.LastIndex(fileName, ".")
- ext, mtype := "", ""
- if dotIndex > 0 {
- ext = strings.ToLower(fileName[dotIndex:])
- mtype = mime.TypeByExtension(ext)
- }
- contentType := part.Header.Get("Content-Type")
- if contentType != "" && mtype != contentType {
- mimeType = contentType //only return mime type if not deductable
- mtype = contentType
- }
-
- if part.Header.Get("Content-Encoding") == "gzip" {
- if unzipped, e := util.UnGzipData(data); e == nil {
- originalDataSize = len(unzipped)
- }
- isGzipped = true
- } else if util.IsGzippable(ext, mtype, data) {
- if compressedData, err := util.GzipData(data); err == nil {
- if len(data) > len(compressedData) {
- data = compressedData
- isGzipped = true
- }
- }
- }
- }
-
- return
-}
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
new file mode 100644
index 000000000..85526aaa8
--- /dev/null
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -0,0 +1,166 @@
+package needle
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime"
+ "net/http"
+ "path"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type ParsedUpload struct {
+ FileName string
+ Data []byte
+ MimeType string
+ PairMap map[string]string
+ IsGzipped bool
+ OriginalDataSize int
+ ModifiedTime uint64
+ Ttl *TTL
+ IsChunkedFile bool
+ UncompressedData []byte
+}
+
+func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
+ pu = &ParsedUpload{}
+ pu.PairMap = make(map[string]string)
+ for k, v := range r.Header {
+ if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
+ pu.PairMap[k] = v[0]
+ }
+ }
+
+ if r.Method == "POST" {
+ e = parseMultipart(r, sizeLimit, pu)
+ } else {
+ e = parsePut(r, sizeLimit, pu)
+ }
+ if e != nil {
+ return
+ }
+
+ pu.ModifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ pu.Ttl, _ = ReadTTL(r.FormValue("ttl"))
+
+ pu.OriginalDataSize = len(pu.Data)
+ pu.UncompressedData = pu.Data
+ if pu.IsGzipped {
+ if unzipped, e := util.UnGzipData(pu.Data); e == nil {
+ pu.OriginalDataSize = len(unzipped)
+ pu.UncompressedData = unzipped
+ }
+ } else if shouldGzip, _ := util.IsGzippableFileType("", pu.MimeType); shouldGzip {
+ if compressedData, err := util.GzipData(pu.Data); err == nil {
+ pu.Data = compressedData
+ pu.IsGzipped = true
+ }
+ }
+
+ return
+}
+
+func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
+ pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip"
+ pu.MimeType = r.Header.Get("Content-Type")
+ pu.FileName = ""
+ pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1))
+ if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 {
+ io.Copy(ioutil.Discard, r.Body)
+ }
+ r.Body.Close()
+ return nil
+}
+
+func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
+ defer func() {
+ if e != nil && r.Body != nil {
+ io.Copy(ioutil.Discard, r.Body)
+ r.Body.Close()
+ }
+ }()
+ form, fe := r.MultipartReader()
+ if fe != nil {
+ glog.V(0).Infoln("MultipartReader [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ //first multi-part item
+ part, fe := form.NextPart()
+ if fe != nil {
+ glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ pu.FileName = part.FileName()
+ if pu.FileName != "" {
+ pu.FileName = path.Base(pu.FileName)
+ }
+
+ pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1))
+ if e != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", e)
+ return
+ }
+ if len(pu.Data) == int(sizeLimit)+1 {
+ e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
+ return
+ }
+
+ //if the filename is empty string, do a search on the other multi-part items
+ for pu.FileName == "" {
+ part2, fe := form.NextPart()
+ if fe != nil {
+ break // no more or on error, just safely break
+ }
+
+ fName := part2.FileName()
+
+ //found the first <file type> multi-part has filename
+ if fName != "" {
+ data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1))
+ if fe2 != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", fe2)
+ e = fe2
+ return
+ }
+ if len(data2) == int(sizeLimit)+1 {
+ e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
+ return
+ }
+
+ //update
+ pu.Data = data2
+ pu.FileName = path.Base(fName)
+ break
+ }
+ }
+
+ pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
+
+ if !pu.IsChunkedFile {
+
+ dotIndex := strings.LastIndex(pu.FileName, ".")
+ ext, mtype := "", ""
+ if dotIndex > 0 {
+ ext = strings.ToLower(pu.FileName[dotIndex:])
+ mtype = mime.TypeByExtension(ext)
+ }
+ contentType := part.Header.Get("Content-Type")
+ if contentType != "" && contentType != "application/octet-stream" && mtype != contentType {
+ pu.MimeType = contentType //only return mime type if not deductable
+ mtype = contentType
+ }
+
+ pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
+ }
+
+ return
+}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 8e5d18b1a..7f8aa4823 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -125,13 +125,13 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.DataStorageBackend, off int64) {
+ defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
- glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
}
}
}(w, end)
@@ -150,7 +150,7 @@ func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset u
return offset, size, actualSize, err
}
-func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
+func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
@@ -191,7 +191,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers
}
// ReadData hydrates the needle from the file, with only n.Id is set.
-func (n *Needle) ReadData(r backend.DataStorageBackend, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size, version)
if err != nil {
return err
@@ -266,7 +266,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
return nil
}
-func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
+func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
n = new(Needle)
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
@@ -301,7 +301,7 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 {
//n should be a needle already read the header
//the input stream will read until next file entry
-func (n *Needle) ReadNeedleBody(r backend.DataStorageBackend, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
+func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
if bodyLength <= 0 {
return nil, nil
diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go
index 4a169870d..179057876 100644
--- a/weed/storage/needle/volume_ttl.go
+++ b/weed/storage/needle/volume_ttl.go
@@ -69,6 +69,9 @@ func (t *TTL) ToBytes(output []byte) {
}
func (t *TTL) ToUint32() (output uint32) {
+ if t == nil || t.Count == 0 {
+ return 0
+ }
output = uint32(t.Count) << 8
output += uint32(t.Unit)
return output
diff --git a/weed/storage/needle_map/btree_map.go b/weed/storage/needle_map/btree_map.go
deleted file mode 100644
index a26c5e068..000000000
--- a/weed/storage/needle_map/btree_map.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package needle_map
-
-import (
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/google/btree"
-)
-
-//This map assumes mostly inserting increasing keys
-type BtreeMap struct {
- tree *btree.BTree
-}
-
-func NewBtreeMap() *BtreeMap {
- return &BtreeMap{
- tree: btree.New(32),
- }
-}
-
-func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
- found := cm.tree.ReplaceOrInsert(NeedleValue{key, offset, size})
- if found != nil {
- old := found.(NeedleValue)
- return old.Offset, old.Size
- }
- return
-}
-
-func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) {
- found := cm.tree.Delete(NeedleValue{key, Offset{}, 0})
- if found != nil {
- old := found.(NeedleValue)
- return old.Size
- }
- return
-}
-func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) {
- found := cm.tree.Get(NeedleValue{key, Offset{}, 0})
- if found != nil {
- old := found.(NeedleValue)
- return &old, true
- }
- return nil, false
-}
-
-// Visit visits all entries or stop if any error when visiting
-func (cm *BtreeMap) AscendingVisit(visit func(NeedleValue) error) (ret error) {
- cm.tree.Ascend(func(item btree.Item) bool {
- needle := item.(NeedleValue)
- ret = visit(needle)
- return ret == nil
- })
- return ret
-}
diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go
index 3bad85727..7eea3969a 100644
--- a/weed/storage/needle_map/compact_map_test.go
+++ b/weed/storage/needle_map/compact_map_test.go
@@ -8,7 +8,14 @@ import (
func TestOverflow2(t *testing.T) {
m := NewCompactMap()
- m.Set(NeedleId(150088), ToOffset(8), 3000073)
+ _, oldSize := m.Set(NeedleId(150088), ToOffset(8), 3000073)
+ if oldSize != 0 {
+ t.Fatalf("expecting no previous data")
+ }
+ _, oldSize = m.Set(NeedleId(150088), ToOffset(8), 3000073)
+ if oldSize != 3000073 {
+ t.Fatalf("expecting previous data size is %d, not %d", 3000073, oldSize)
+ }
m.Set(NeedleId(150073), ToOffset(8), 3000073)
m.Set(NeedleId(150089), ToOffset(8), 3000073)
m.Set(NeedleId(150076), ToOffset(8), 3000073)
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
new file mode 100644
index 000000000..a52d52a10
--- /dev/null
+++ b/weed/storage/needle_map/memdb.go
@@ -0,0 +1,119 @@
+package needle_map
+
+import (
+ "fmt"
+ "os"
+
+ "github.com/syndtr/goleveldb/leveldb"
+ "github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/syndtr/goleveldb/leveldb/storage"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+//This map uses in memory level db
+type MemDb struct {
+ db *leveldb.DB
+}
+
+func NewMemDb() *MemDb {
+ opts := &opt.Options{}
+
+ var err error
+ t := &MemDb{}
+ if t.db, err = leveldb.Open(storage.NewMemStorage(), opts); err != nil {
+ glog.V(0).Infof("MemDb fails to open: %v", err)
+ return nil
+ }
+
+ return t
+}
+
+func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error {
+
+ bytes := ToBytes(key, offset, size)
+
+ if err := cm.db.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize], nil); err != nil {
+ return fmt.Errorf("failed to write temp leveldb: %v", err)
+ }
+ return nil
+}
+
+func (cm *MemDb) Delete(key NeedleId) error {
+ bytes := make([]byte, NeedleIdSize)
+ NeedleIdToBytes(bytes, key)
+ return cm.db.Delete(bytes, nil)
+
+}
+func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
+ bytes := make([]byte, NeedleIdSize)
+ NeedleIdToBytes(bytes[0:NeedleIdSize], key)
+ data, err := cm.db.Get(bytes, nil)
+ if err != nil || len(data) != OffsetSize+SizeSize {
+ return nil, false
+ }
+ offset := BytesToOffset(data[0:OffsetSize])
+ size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ return &NeedleValue{Key: key, Offset: offset, Size: size}, true
+}
+
+// Visit visits all entries or stop if any error when visiting
+func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
+ iter := cm.db.NewIterator(nil, nil)
+ for iter.Next() {
+ key := BytesToNeedleId(iter.Key())
+ data := iter.Value()
+ offset := BytesToOffset(data[0:OffsetSize])
+ size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+
+ needle := NeedleValue{Key: key, Offset: offset, Size: size}
+ ret = visit(needle)
+ if ret != nil {
+ return
+ }
+ }
+ iter.Release()
+ ret = iter.Error()
+
+ return
+}
+
+func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
+ idxFile, err := os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
+ if err != nil {
+ return
+ }
+ defer idxFile.Close()
+
+ return cm.AscendingVisit(func(value NeedleValue) error {
+ if value.Offset.IsZero() || value.Size == TombstoneFileSize {
+ return nil
+ }
+ _, err := idxFile.Write(value.ToBytes())
+ return err
+ })
+
+}
+
+func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
+ idxFile, err := os.OpenFile(idxName, os.O_RDONLY, 0644)
+ if err != nil {
+ return
+ }
+ defer idxFile.Close()
+
+ return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
+ if offset.IsZero() || size == TombstoneFileSize {
+ return cm.Delete(key)
+ }
+ return cm.Set(key, offset, size)
+ })
+
+}
+
+func (cm *MemDb) Close() {
+ cm.db.Close()
+}
diff --git a/weed/storage/needle_map/memdb_test.go b/weed/storage/needle_map/memdb_test.go
new file mode 100644
index 000000000..7b45d23f8
--- /dev/null
+++ b/weed/storage/needle_map/memdb_test.go
@@ -0,0 +1,23 @@
+package needle_map
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func BenchmarkMemDb(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ nm := NewMemDb()
+
+ nid := types.NeedleId(345)
+ offset := types.Offset{
+ OffsetHigher: types.OffsetHigher{},
+ OffsetLower: types.OffsetLower{},
+ }
+ nm.Set(nid, offset, 324)
+ nm.Close()
+ }
+
+}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index ef8571e83..3bb258559 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -128,8 +128,17 @@ func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
}
func (m *LevelDbNeedleMap) Close() {
- m.indexFile.Close()
- m.db.Close()
+ indexFileName := m.indexFile.Name()
+ if err := m.indexFile.Sync(); err != nil {
+ glog.Warningf("sync file %s failed: %v", indexFileName, err)
+ }
+ if err := m.indexFile.Close(); err != nil {
+ glog.Warningf("close index file %s failed: %v", indexFileName, err)
+ }
+
+ if err := m.db.Close(); err != nil {
+ glog.Warningf("close levelDB failed: %v", err)
+ }
}
func (m *LevelDbNeedleMap) Destroy() error {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index ee639a7e6..84197912f 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -22,24 +22,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
return nm
}
-func NewBtreeNeedleMap(file *os.File) *NeedleMap {
- nm := &NeedleMap{
- m: needle_map.NewBtreeMap(),
- }
- nm.indexFile = file
- return nm
-}
-
func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewCompactNeedleMap(file)
return doLoading(file, nm)
}
-func LoadBtreeNeedleMap(file *os.File) (*NeedleMap, error) {
- nm := NewBtreeNeedleMap(file)
- return doLoading(file, nm)
-}
-
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
nm.MaybeSetMaxFileKey(key)
@@ -47,14 +34,12 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
- // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
if !oldOffset.IsZero() && oldSize != TombstoneFileSize {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
} else {
oldSize := nm.m.Delete(NeedleId(key))
- // glog.V(3).Infoln("removing key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize)
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
@@ -79,6 +64,10 @@ func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
return nm.appendToIndexFile(key, offset, TombstoneFileSize)
}
func (nm *NeedleMap) Close() {
+ indexFileName := nm.indexFile.Name()
+ if err := nm.indexFile.Sync(); err != nil {
+ glog.Warningf("sync file %s failed, %v", indexFileName, err)
+ }
_ = nm.indexFile.Close()
}
func (nm *NeedleMap) Destroy() error {
diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go
index 539f83a87..ae2177a30 100644
--- a/weed/storage/needle_map_metric_test.go
+++ b/weed/storage/needle_map_metric_test.go
@@ -1,17 +1,18 @@
package storage
import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"math/rand"
"testing"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func TestFastLoadingNeedleMapMetrics(t *testing.T) {
idxFile, _ := ioutil.TempFile("", "tmp.idx")
- nm := NewBtreeNeedleMap(idxFile)
+ nm := NewCompactNeedleMap(idxFile)
for i := 0; i < 10000; i++ {
nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
new file mode 100644
index 000000000..e6f9258f3
--- /dev/null
+++ b/weed/storage/needle_map_sorted_file.go
@@ -0,0 +1,105 @@
+package storage
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+type SortedFileNeedleMap struct {
+ baseNeedleMapper
+ baseFileName string
+ dbFile *os.File
+ dbFileSize int64
+}
+
+func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
+ m = &SortedFileNeedleMap{baseFileName: baseFileName}
+ m.indexFile = indexFile
+ fileName := baseFileName + ".sdx"
+ if !isSortedFileFresh(fileName, indexFile) {
+ glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
+ erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx")
+ glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
+ }
+ glog.V(1).Infof("Opening %s...", fileName)
+
+ if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil {
+ return
+ }
+ dbStat, _ := m.dbFile.Stat()
+ m.dbFileSize = dbStat.Size()
+ glog.V(1).Infof("Loading %s...", indexFile.Name())
+ mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
+ if indexLoadError != nil {
+ return nil, indexLoadError
+ }
+ m.mapMetric = *mm
+ return
+}
+
+func isSortedFileFresh(dbFileName string, indexFile *os.File) bool {
+ // normally we always write to index file first
+ dbFile, err := os.Open(dbFileName)
+ if err != nil {
+ return false
+ }
+ defer dbFile.Close()
+ dbStat, dbStatErr := dbFile.Stat()
+ indexStat, indexStatErr := indexFile.Stat()
+ if dbStatErr != nil || indexStatErr != nil {
+ glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr)
+ return false
+ }
+
+ return dbStat.ModTime().After(indexStat.ModTime())
+}
+
+func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, ok bool) {
+ offset, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
+ ok = err == nil
+ return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, ok
+
+}
+
+func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+ return os.ErrInvalid
+}
+
+func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
+
+ _, size, err := erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, nil)
+
+ if err != nil {
+ if err == erasure_coding.NotFoundError {
+ return nil
+ }
+ return err
+ }
+
+ if size == TombstoneFileSize {
+ return nil
+ }
+
+ // write to index file first
+ if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
+ return err
+ }
+ _, _, err = erasure_coding.SearchNeedleFromSortedIndex(m.dbFile, m.dbFileSize, key, erasure_coding.MarkNeedleDeleted)
+
+ return err
+}
+
+func (m *SortedFileNeedleMap) Close() {
+ m.indexFile.Close()
+ m.dbFile.Close()
+}
+
+func (m *SortedFileNeedleMap) Destroy() error {
+ m.Close()
+ os.Remove(m.indexFile.Name())
+ return os.Remove(m.baseFileName + ".sdx")
+}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 4d1061bed..e29680f6f 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,14 +2,19 @@ package storage
import (
"fmt"
+ "path/filepath"
+ "strings"
"sync/atomic"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "google.golang.org/grpc"
)
const (
@@ -60,7 +65,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
- rt, e := NewReplicaPlacementFromString(replicaPlacement)
+ rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
@@ -101,7 +106,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
@@ -126,10 +131,10 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("No more free space left")
}
-func (s *Store) Status() []*VolumeInfo {
+func (s *Store) VolumeInfos() []*VolumeInfo {
var stats []*VolumeInfo
for _, location := range s.Locations {
- location.RLock()
+ location.volumesLock.RLock()
for k, v := range location.volumes {
s := &VolumeInfo{
Id: needle.VolumeId(k),
@@ -140,13 +145,14 @@ func (s *Store) Status() []*VolumeInfo {
FileCount: int(v.FileCount()),
DeleteCount: int(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.readOnly,
+ ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
}
+ s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
stats = append(stats, s)
}
- location.RUnlock()
+ location.volumesLock.RUnlock()
}
sortVolumeInfos(stats)
return stats
@@ -167,7 +173,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
- location.RLock()
+ location.volumesLock.RLock()
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
@@ -184,16 +190,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
- location.RUnlock()
+ location.volumesLock.RUnlock()
if len(deleteVids) > 0 {
// delete expired volumes.
- location.Lock()
+ location.volumesLock.Lock()
for _, vid := range deleteVids {
location.deleteVolumeById(vid)
glog.V(0).Infoln("volume", vid, "is deleted.")
}
- location.Unlock()
+ location.volumesLock.Unlock()
}
}
@@ -223,11 +229,11 @@ func (s *Store) Close() {
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
- if v.readOnly {
+ if v.noWriteOrDelete || v.noWriteCanDelete {
err = fmt.Errorf("volume %d is read only", i)
return
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
_, size, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -241,10 +247,10 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
if v := s.findVolume(i); v != nil {
- if v.readOnly {
+ if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -273,7 +279,7 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
- v.readOnly = true
+ v.noWriteOrDelete = true
return nil
}
@@ -343,6 +349,31 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
return fmt.Errorf("volume %d not found on disk", i)
}
+func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
+
+ for _, location := range s.Locations {
+ fileInfo, found := location.LocateVolume(i)
+ if !found {
+ continue
+ }
+ // load, modify, save
+ baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
+ vifFile := filepath.Join(location.Directory, baseFileName+".vif")
+ volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to load vif", i)
+ }
+ volumeInfo.Replication = replication
+ err = pb.SaveVolumeInfo(vifFile, volumeInfo)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to save vif", i)
+ }
+ return nil
+ }
+
+ return fmt.Errorf("volume %d not found on disk", i)
+}
+
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 7e3f1a46c..e423e7dca 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -8,6 +8,8 @@ import (
"sync"
"time"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -16,7 +18,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
@@ -115,19 +116,11 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
}
}
-func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found {
- // read the volume version
- for localEcVolume.Version == 0 {
- err := s.readEcVolumeVersion(ctx, vid, localEcVolume)
- time.Sleep(1357 * time.Millisecond)
- glog.V(0).Infof("ReadEcShardNeedle vid %d version:%v: %v", vid, localEcVolume.Version, err)
- }
- version := localEcVolume.Version
-
- offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, version)
+ offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n.Id, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
@@ -140,7 +133,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
}
- bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
+ bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
@@ -148,7 +141,7 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
}
- err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, version)
+ err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("readbytes: %v", err)
}
@@ -159,30 +152,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec shard %d not found", vid)
}
-func (s *Store) readEcVolumeVersion(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume) (err error) {
-
- interval := erasure_coding.Interval{
- BlockIndex: 0,
- InnerBlockOffset: 0,
- Size: _SuperBlockSize,
- IsLargeBlock: true, // it could be large block, but ok in this place
- LargeBlockRowsCount: 0,
- }
- data, _, err := s.readEcShardIntervals(ctx, vid, 0, ecVolume, []erasure_coding.Interval{interval})
- if err == nil {
- ecVolume.Version = needle.Version(data[0])
- }
- return
-}
-
-func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
- if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
+ if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
}
for i, interval := range intervals {
- if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
+ if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
return nil, isDeleted, e
} else {
if isDeleted {
@@ -198,7 +175,7 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n
return
}
-func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
@@ -213,7 +190,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
// try reading directly
if hasShardIdLocation {
- _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
+ _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
@@ -222,7 +199,7 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
}
// try reading by recovering from other shards
- _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
+ _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
@@ -238,7 +215,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha
ecVolume.ShardLocationsLock.Unlock()
}
-func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
+func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
shardCount := len(ecVolume.ShardLocations)
if shardCount < erasure_coding.DataShardsCount &&
@@ -257,7 +234,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
req := &master_pb.LookupEcVolumeRequest{
VolumeId: uint32(ecVolume.VolumeId),
}
- resp, err := masterClient.LookupEcVolume(ctx, req)
+ resp, err := masterClient.LookupEcVolume(context.Background(), req)
if err != nil {
return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
}
@@ -281,7 +258,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
-func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
@@ -289,7 +266,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
for _, sourceDataNode := range sourceDataNodes {
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
- n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
+ n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
}
@@ -299,12 +276,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
return
}
-func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
- shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
+ shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
@@ -339,7 +316,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
return
}
-func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
@@ -367,7 +344,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty
go func(shardId erasure_coding.ShardId, locations []string) {
defer wg.Done()
data := make([]byte, len(buf))
- nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
+ nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index e027d2887..4a75fb20b 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -12,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
-func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
+func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
- count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n)
+ count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
if err != nil {
return 0, err
@@ -24,7 +24,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
return 0, fmt.Errorf("unexpected cookie %x", cookie)
}
- if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil {
+ if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
return 0, err
}
@@ -32,7 +32,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
}
-func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
@@ -43,13 +43,13 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
hasDeletionSuccess := false
- err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId)
+ err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
if err == nil {
hasDeletionSuccess = true
}
for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
- if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil {
+ if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
hasDeletionSuccess = true
}
}
@@ -62,7 +62,7 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
}
-func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
ecVolume.ShardLocationsLock.RLock()
sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
@@ -74,7 +74,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
for _, sourceDataNode := range sourceDataNodes {
glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
- err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
+ err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
if err != nil {
return err
}
@@ -85,12 +85,12 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
}
-func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
- _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
+ _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
VolumeId: uint32(vid),
Collection: collection,
FileKey: uint64(needleId),
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index b1f1a6277..e94d9b516 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -16,7 +16,8 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil {
- return v.Compact(preallocate, compactionBytePerSecond)
+ return v.Compact2(preallocate) // compactionBytePerSecond
+ // return v.Compact(preallocate, compactionBytePerSecond)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
diff --git a/weed/storage/replica_placement.go b/weed/storage/super_block/replica_placement.go
index c1aca52eb..fcccbba7d 100644
--- a/weed/storage/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"errors"
diff --git a/weed/storage/replica_placement_test.go b/weed/storage/super_block/replica_placement_test.go
index 7968af7cb..7742ba548 100644
--- a/weed/storage/replica_placement_test.go
+++ b/weed/storage/super_block/replica_placement_test.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"testing"
diff --git a/weed/storage/super_block/super_block.go b/weed/storage/super_block/super_block.go
new file mode 100644
index 000000000..f48cd0bdc
--- /dev/null
+++ b/weed/storage/super_block/super_block.go
@@ -0,0 +1,69 @@
+package super_block
+
+import (
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+const (
+ SuperBlockSize = 8
+)
+
+/*
+* Super block currently has 8 bytes allocated for each volume.
+* Byte 0: version, 1 or 2
+* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
+* Byte 2 and byte 3: Time to live. See TTL for definition
+* Byte 4 and byte 5: The number of times the volume has been compacted.
+* Rest bytes: Reserved
+ */
+type SuperBlock struct {
+ Version needle.Version
+ ReplicaPlacement *ReplicaPlacement
+ Ttl *needle.TTL
+ CompactionRevision uint16
+ Extra *master_pb.SuperBlockExtra
+ ExtraSize uint16
+}
+
+func (s *SuperBlock) BlockSize() int {
+ switch s.Version {
+ case needle.Version2, needle.Version3:
+ return SuperBlockSize + int(s.ExtraSize)
+ }
+ return SuperBlockSize
+}
+
+func (s *SuperBlock) Bytes() []byte {
+ header := make([]byte, SuperBlockSize)
+ header[0] = byte(s.Version)
+ header[1] = s.ReplicaPlacement.Byte()
+ s.Ttl.ToBytes(header[2:4])
+ util.Uint16toBytes(header[4:6], s.CompactionRevision)
+
+ if s.Extra != nil {
+ extraData, err := proto.Marshal(s.Extra)
+ if err != nil {
+ glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
+ }
+ extraSize := len(extraData)
+ if extraSize > 256*256-2 {
+ // reserve a couple of bits for future extension
+ glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
+ }
+ s.ExtraSize = uint16(extraSize)
+ util.Uint16toBytes(header[6:8], s.ExtraSize)
+
+ header = append(header, extraData...)
+ }
+
+ return header
+}
+
+func (s *SuperBlock) Initialized() bool {
+ return s.ReplicaPlacement != nil && s.Ttl != nil
+}
diff --git a/weed/storage/super_block/super_block_read.go.go b/weed/storage/super_block/super_block_read.go.go
new file mode 100644
index 000000000..9eb12e116
--- /dev/null
+++ b/weed/storage/super_block/super_block_read.go.go
@@ -0,0 +1,44 @@
+package super_block
+
+import (
+ "fmt"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// ReadSuperBlock reads from data file and load it into volume's super block
+func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
+
+ header := make([]byte, SuperBlockSize)
+ if _, e := datBackend.ReadAt(header, 0); e != nil {
+ err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.Name(), e)
+ return
+ }
+
+ superBlock.Version = needle.Version(header[0])
+ if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
+ err = fmt.Errorf("cannot read replica type: %s", err.Error())
+ return
+ }
+ superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
+ superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
+ superBlock.ExtraSize = util.BytesToUint16(header[6:8])
+
+ if superBlock.ExtraSize > 0 {
+ // read more
+ extraData := make([]byte, int(superBlock.ExtraSize))
+ superBlock.Extra = &master_pb.SuperBlockExtra{}
+ err = proto.Unmarshal(extraData, superBlock.Extra)
+ if err != nil {
+ err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.Name(), err)
+ return
+ }
+ }
+
+ return
+}
diff --git a/weed/storage/volume_super_block_test.go b/weed/storage/super_block/super_block_test.go
index 06ad8a5d3..25699070d 100644
--- a/weed/storage/volume_super_block_test.go
+++ b/weed/storage/super_block/super_block_test.go
@@ -1,4 +1,4 @@
-package storage
+package super_block
import (
"testing"
@@ -10,7 +10,7 @@ func TestSuperBlockReadWrite(t *testing.T) {
rp, _ := NewReplicaPlacementFromByte(byte(001))
ttl, _ := needle.ReadTTL("15d")
s := &SuperBlock{
- version: needle.CurrentVersion,
+ Version: needle.CurrentVersion,
ReplicaPlacement: rp,
Ttl: ttl,
}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index e85696eab..7da83de7a 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -2,18 +2,19 @@ package storage
import (
"fmt"
+ "path"
+ "strconv"
+ "sync"
+ "time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "path"
- "strconv"
- "sync"
- "time"
-
"github.com/chrislusf/seaweedfs/weed/glog"
)
@@ -21,15 +22,17 @@ type Volume struct {
Id needle.VolumeId
dir string
Collection string
- DataBackend backend.DataStorageBackend
+ DataBackend backend.BackendStorageFile
nm NeedleMapper
needleMapKind NeedleMapType
- readOnly bool
+ noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
- SuperBlock
+ super_block.SuperBlock
- dataFileAccessLock sync.Mutex
+ dataFileAccessLock sync.RWMutex
lastModifiedTsSeconds uint64 //unix time in seconds
lastAppendAtNs uint64 //unix time in nanoseconds
@@ -37,18 +40,20 @@ type Volume struct {
lastCompactRevision uint16
isCompacting bool
+
+ volumeInfo *volume_server_pb.VolumeInfo
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
- v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
+ v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
return
}
func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, readOnly:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.readOnly)
+ return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -65,12 +70,15 @@ func (v *Volume) FileName() (fileName string) {
}
func (v *Volume) Version() needle.Version {
- return v.SuperBlock.Version()
+ if v.volumeInfo.Version != 0 {
+ v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
+ }
+ return v.SuperBlock.Version
}
func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time) {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.DataBackend == nil {
return
@@ -80,13 +88,13 @@ func (v *Volume) FileStat() (datSize uint64, idxSize uint64, modTime time.Time)
if e == nil {
return uint64(datFileSize), v.nm.IndexFileSize(), modTime
}
- glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.String(), e)
+ glog.V(0).Infof("Failed to read file size %s %v", v.DataBackend.Name(), e)
return // -1 causes integer overflow and the volume to become unwritable.
}
func (v *Volume) ContentSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -94,8 +102,8 @@ func (v *Volume) ContentSize() uint64 {
}
func (v *Volume) DeletedSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -103,8 +111,8 @@ func (v *Volume) DeletedSize() uint64 {
}
func (v *Volume) FileCount() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -112,8 +120,8 @@ func (v *Volume) FileCount() uint64 {
}
func (v *Volume) DeletedCount() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -121,8 +129,8 @@ func (v *Volume) DeletedCount() uint64 {
}
func (v *Volume) MaxFileKey() types.NeedleId {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -130,8 +138,8 @@ func (v *Volume) MaxFileKey() types.NeedleId {
}
func (v *Volume) IndexFileSize() uint64 {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
if v.nm == nil {
return 0
}
@@ -172,9 +180,9 @@ func (v *Volume) expired(volumeSizeLimit uint64) bool {
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
@@ -200,18 +208,32 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
size, _, modTime := v.FileStat()
- return &master_pb.VolumeInformationMessage{
+ volumInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
Size: size,
Collection: v.Collection,
- FileCount: uint64(v.FileCount()),
- DeleteCount: uint64(v.DeletedCount()),
+ FileCount: v.FileCount(),
+ DeleteCount: v.DeletedCount(),
DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.readOnly,
+ ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
}
+
+ volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey()
+
+ return volumInfo
+}
+
+func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
+ if v.volumeInfo == nil {
+ return
+ }
+ if len(v.volumeInfo.GetFiles()) == 0 {
+ return
+ }
+ return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey()
}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index fe0506917..f7075fe2b 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -6,17 +6,19 @@ import (
"io"
"os"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "google.golang.org/grpc"
)
func (v *Volume) GetVolumeSyncStatus() *volume_server_pb.VolumeSyncStatusResponse {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
var syncStatus = &volume_server_pb.VolumeSyncStatusResponse{}
if datSize, _, err := v.DataBackend.GetStat(); err == nil {
@@ -62,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.)
func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
- ctx := context.Background()
-
startFromOffset, _, _ := v.FileStat()
appendAtNs, err := v.findLastAppendAtNs()
if err != nil {
@@ -74,7 +74,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
+ stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
VolumeId: uint32(v.Id),
SinceNs: appendAtNs,
})
@@ -108,7 +108,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
}
// add to needle map
- return ScanVolumeFileFrom(v.version, v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
+ return ScanVolumeFileFrom(v.Version(), v.DataBackend, int64(startFromOffset), &VolumeFileScanner4GenIdx{v: v})
}
@@ -154,11 +154,11 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
if err != nil {
return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
}
- _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
@@ -244,7 +244,7 @@ type VolumeFileScanner4GenIdx struct {
v *Volume
}
-func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock SuperBlock) error {
+func (scanner *VolumeFileScanner4GenIdx) VisitSuperBlock(superBlock super_block.SuperBlock) error {
return nil
}
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 61b59e9f7..a65c2a3ff 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -55,7 +55,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile backend.DataStorageBackend, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
+func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, err
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
index b27a62990..ffcb246a4 100644
--- a/weed/storage/volume_create.go
+++ b/weed/storage/volume_create.go
@@ -9,10 +9,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+ if e != nil {
+ return nil, e
+ }
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
- return backend.NewDiskFile(file), e
+ return backend.NewDiskFile(file), nil
}
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go
index e3305d991..ee599ac32 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/volume_create_linux.go
@@ -10,11 +10,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+ if e != nil {
+ return nil, e
+ }
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
- return backend.NewDiskFile(file), e
+ return backend.NewDiskFile(file), nil
}
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go
index 81536810b..e1c0b961f 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/volume_create_windows.go
@@ -11,18 +11,23 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
-
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
if memoryMapSizeMB > 0 {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT, 0644, true)
- return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), e
+ if e != nil {
+ return nil, e
+ }
+ return memory_map.NewMemoryMappedFile(file, memoryMapSizeMB), nil
} else {
file, e := os_overloads.OpenFile(fileName, windows.O_RDWR|windows.O_CREAT|windows.O_TRUNC, 0644, false)
- return backend.NewDiskFile(file), e
+ if e != nil {
+ return nil, e
+ }
+ return backend.NewDiskFile(file), nil
}
}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index 111058b6e..313818cde 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -6,37 +6,42 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
type VolumeInfo struct {
- Id needle.VolumeId
- Size uint64
- ReplicaPlacement *ReplicaPlacement
- Ttl *needle.TTL
- Collection string
- Version needle.Version
- FileCount int
- DeleteCount int
- DeletedByteCount uint64
- ReadOnly bool
- CompactRevision uint32
- ModifiedAtSecond int64
+ Id needle.VolumeId
+ Size uint64
+ ReplicaPlacement *super_block.ReplicaPlacement
+ Ttl *needle.TTL
+ Collection string
+ Version needle.Version
+ FileCount int
+ DeleteCount int
+ DeletedByteCount uint64
+ ReadOnly bool
+ CompactRevision uint32
+ ModifiedAtSecond int64
+ RemoteStorageName string
+ RemoteStorageKey string
}
func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err error) {
vi = VolumeInfo{
- Id: needle.VolumeId(m.Id),
- Size: m.Size,
- Collection: m.Collection,
- FileCount: int(m.FileCount),
- DeleteCount: int(m.DeleteCount),
- DeletedByteCount: m.DeletedByteCount,
- ReadOnly: m.ReadOnly,
- Version: needle.Version(m.Version),
- CompactRevision: m.CompactRevision,
- ModifiedAtSecond: m.ModifiedAtSecond,
+ Id: needle.VolumeId(m.Id),
+ Size: m.Size,
+ Collection: m.Collection,
+ FileCount: int(m.FileCount),
+ DeleteCount: int(m.DeleteCount),
+ DeletedByteCount: m.DeletedByteCount,
+ ReadOnly: m.ReadOnly,
+ Version: needle.Version(m.Version),
+ CompactRevision: m.CompactRevision,
+ ModifiedAtSecond: m.ModifiedAtSecond,
+ RemoteStorageName: m.RemoteStorageName,
+ RemoteStorageKey: m.RemoteStorageKey,
}
- rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
+ rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
@@ -51,7 +56,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
Collection: m.Collection,
Version: needle.Version(m.Version),
}
- rp, e := NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
+ rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
return vi, e
}
@@ -60,6 +65,10 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
return vi, nil
}
+func (vi VolumeInfo) IsRemote() bool {
+ return vi.RemoteStorageName != ""
+}
+
func (vi VolumeInfo) String() string {
return fmt.Sprintf("Id:%d, Size:%d, ReplicaPlacement:%s, Collection:%s, Version:%v, FileCount:%d, DeleteCount:%d, DeletedByteCount:%d, ReadOnly:%v",
vi.Id, vi.Size, vi.ReplicaPlacement, vi.Collection, vi.Version, vi.FileCount, vi.DeleteCount, vi.DeletedByteCount, vi.ReadOnly)
@@ -67,18 +76,20 @@ func (vi VolumeInfo) String() string {
func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
return &master_pb.VolumeInformationMessage{
- Id: uint32(vi.Id),
- Size: uint64(vi.Size),
- Collection: vi.Collection,
- FileCount: uint64(vi.FileCount),
- DeleteCount: uint64(vi.DeleteCount),
- DeletedByteCount: vi.DeletedByteCount,
- ReadOnly: vi.ReadOnly,
- ReplicaPlacement: uint32(vi.ReplicaPlacement.Byte()),
- Version: uint32(vi.Version),
- Ttl: vi.Ttl.ToUint32(),
- CompactRevision: vi.CompactRevision,
- ModifiedAtSecond: vi.ModifiedAtSecond,
+ Id: uint32(vi.Id),
+ Size: uint64(vi.Size),
+ Collection: vi.Collection,
+ FileCount: uint64(vi.FileCount),
+ DeleteCount: uint64(vi.DeleteCount),
+ DeletedByteCount: vi.DeletedByteCount,
+ ReadOnly: vi.ReadOnly,
+ ReplicaPlacement: uint32(vi.ReplicaPlacement.Byte()),
+ Version: uint32(vi.Version),
+ Ttl: vi.Ttl.ToUint32(),
+ CompactRevision: vi.CompactRevision,
+ ModifiedAtSecond: vi.ModifiedAtSecond,
+ RemoteStorageName: vi.RemoteStorageName,
+ RemoteStorageKey: vi.RemoteStorageKey,
}
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 6f1d8fe40..6b42fc452 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -3,146 +3,148 @@ package storage
import (
"fmt"
"os"
- "time"
- "github.com/chrislusf/seaweedfs/weed/stats"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
-func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, e error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
- v.SuperBlock = SuperBlock{}
+ v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
- e = v.load(false, false, needleMapKind, 0)
+ err = v.load(false, false, needleMapKind, 0)
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) error {
- var e error
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
fileName := v.FileName()
alreadyHasSuperBlock := false
- // open dat file
- if exists, canRead, canWrite, modifiedTime, fileSize := checkFile(fileName + ".dat"); exists {
+ hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
+
+ if v.HasRemoteFile() {
+ v.noWriteCanDelete = true
+ v.noWriteOrDelete = false
+ glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
+ v.LoadRemoteFile()
+ alreadyHasSuperBlock = true
+ } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists {
+ // open dat file
if !canRead {
return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
}
var dataFile *os.File
if canWrite {
- dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
} else {
glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
- dataFile, e = os.Open(fileName + ".dat")
- v.readOnly = true
+ dataFile, err = os.Open(fileName + ".dat")
+ v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
- if fileSize >= _SuperBlockSize {
+ if fileSize >= super_block.SuperBlockSize {
alreadyHasSuperBlock = true
}
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
- v.DataBackend, e = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
+ v.DataBackend, err = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
} else {
return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
}
}
- if e != nil {
- if !os.IsPermission(e) {
- return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, e)
+ if err != nil {
+ if !os.IsPermission(err) {
+ return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err)
} else {
- return fmt.Errorf("load data file %s.dat: %v", fileName, e)
+ return fmt.Errorf("load data file %s.dat: %v", fileName, err)
}
}
if alreadyHasSuperBlock {
- e = v.readSuperBlock()
+ err = v.readSuperBlock()
} else {
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s.dat not initialized", fileName)
}
- e = v.maybeWriteSuperBlock()
+ err = v.maybeWriteSuperBlock()
}
- if e == nil && alsoLoadIndex {
+ if err == nil && alsoLoadIndex {
var indexFile *os.File
- if v.readOnly {
- glog.V(1).Infoln("open to read file", fileName+".idx")
- if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); e != nil {
- return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, e)
+ if v.noWriteOrDelete {
+ glog.V(0).Infoln("open to read file", fileName+".idx")
+ if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil {
+ return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err)
}
} else {
glog.V(1).Infoln("open to write file", fileName+".idx")
- if indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); e != nil {
- return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, e)
+ if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
}
}
- if v.lastAppendAtNs, e = CheckVolumeDataIntegrity(v, indexFile); e != nil {
- v.readOnly = true
- glog.V(0).Infof("volumeDataIntegrityChecking failed %v", e)
+ if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
+ v.noWriteOrDelete = true
+ glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
- switch needleMapKind {
- case NeedleMapInMemory:
- glog.V(0).Infoln("loading index", fileName+".idx", "to memory readonly", v.readOnly)
- if v.nm, e = LoadCompactNeedleMap(indexFile); e != nil {
- glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", e)
- }
- case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
- }
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
- }
- case NeedleMapLevelDbMedium:
- glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
- }
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
- }
- case NeedleMapLevelDbLarge:
- glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
- opts := &opt.Options{
- BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
- WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
- CompactionTableSizeMultiplier: 10, // default value is 1
+
+ if v.noWriteOrDelete || v.noWriteCanDelete {
+ if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil {
+ glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err)
}
- if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e)
+ } else {
+ switch needleMapKind {
+ case NeedleMapInMemory:
+ glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
+ if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
+ glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err)
+ }
+ case NeedleMapLevelDb:
+ glog.V(0).Infoln("loading leveldb", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ }
+ case NeedleMapLevelDbMedium:
+ glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ }
+ case NeedleMapLevelDbLarge:
+ glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
+ opts := &opt.Options{
+ BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
+ WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
+ CompactionTableSizeMultiplier: 10, // default value is 1
+ }
+ if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ }
}
}
}
- stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
+ if !hasVolumeInfoFile {
+ v.volumeInfo.Version = uint32(v.SuperBlock.Version)
+ v.SaveVolumeInfo()
+ }
- return e
-}
+ stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
-func checkFile(filename string) (exists, canRead, canWrite bool, modTime time.Time, fileSize int64) {
- exists = true
- fi, err := os.Stat(filename)
- if os.IsNotExist(err) {
- exists = false
- return
- }
- if fi.Mode()&0400 != 0 {
- canRead = true
- }
- if fi.Mode()&0200 != 0 {
- canWrite = true
- }
- modTime = fi.ModTime()
- fileSize = fi.Size()
- return
+ return err
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 242325755..ac6154cef 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -45,22 +46,25 @@ func (v *Volume) Destroy() (err error) {
err = fmt.Errorf("volume %d is compacting", v.Id)
return
}
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if v.HasRemoteFile() && storageName != "" && storageKey != "" {
+ if backendStorage, found := backend.BackendStorages[storageName]; found {
+ backendStorage.DeleteFile(storageKey)
+ }
+ }
v.Close()
os.Remove(v.FileName() + ".dat")
os.Remove(v.FileName() + ".idx")
+ os.Remove(v.FileName() + ".vif")
+ os.Remove(v.FileName() + ".sdx")
os.Remove(v.FileName() + ".cpd")
os.Remove(v.FileName() + ".cpx")
os.RemoveAll(v.FileName() + ".ldb")
- os.RemoveAll(v.FileName() + ".bdb")
return
}
func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
- glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
- if v.readOnly {
- err = fmt.Errorf("%s is read-only", v.DataBackend.String())
- return
- }
+ // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
if v.isFileUnchanged(n) {
@@ -110,9 +114,6 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
- if v.readOnly {
- return 0, fmt.Errorf("%s is read-only", v.DataBackend.String())
- }
v.dataFileAccessLock.Lock()
defer v.dataFileAccessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
@@ -136,8 +137,8 @@ func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
// read fills in Needle content by looking up n.Id from NeedleMapper
func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
nv, ok := v.nm.Get(n.Id)
if !ok || nv.Offset.IsZero() {
@@ -171,7 +172,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
}
type VolumeFileScanner interface {
- VisitSuperBlock(SuperBlock) error
+ VisitSuperBlock(super_block.SuperBlock) error
ReadNeedleBody() bool
VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}
@@ -183,8 +184,10 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err)
}
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
+ if v.volumeInfo.Version == 0 {
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
+ }
}
defer v.Close()
@@ -195,13 +198,13 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
}
-func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
if e != nil {
if e == io.EOF {
return nil
}
- return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.String(), offset, e)
+ return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
}
for n != nil {
var needleBody []byte
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index bce5af465..5e913e062 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -5,92 +5,29 @@ import (
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
)
-const (
- _SuperBlockSize = 8
-)
-
-/*
-* Super block currently has 8 bytes allocated for each volume.
-* Byte 0: version, 1 or 2
-* Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc
-* Byte 2 and byte 3: Time to live. See TTL for definition
-* Byte 4 and byte 5: The number of times the volume has been compacted.
-* Rest bytes: Reserved
- */
-type SuperBlock struct {
- version needle.Version
- ReplicaPlacement *ReplicaPlacement
- Ttl *needle.TTL
- CompactionRevision uint16
- Extra *master_pb.SuperBlockExtra
- extraSize uint16
-}
-
-func (s *SuperBlock) BlockSize() int {
- switch s.version {
- case needle.Version2, needle.Version3:
- return _SuperBlockSize + int(s.extraSize)
- }
- return _SuperBlockSize
-}
-
-func (s *SuperBlock) Version() needle.Version {
- return s.version
-}
-func (s *SuperBlock) Bytes() []byte {
- header := make([]byte, _SuperBlockSize)
- header[0] = byte(s.version)
- header[1] = s.ReplicaPlacement.Byte()
- s.Ttl.ToBytes(header[2:4])
- util.Uint16toBytes(header[4:6], s.CompactionRevision)
-
- if s.Extra != nil {
- extraData, err := proto.Marshal(s.Extra)
- if err != nil {
- glog.Fatalf("cannot marshal super block extra %+v: %v", s.Extra, err)
- }
- extraSize := len(extraData)
- if extraSize > 256*256-2 {
- // reserve a couple of bits for future extension
- glog.Fatalf("super block extra size is %d bigger than %d", extraSize, 256*256-2)
- }
- s.extraSize = uint16(extraSize)
- util.Uint16toBytes(header[6:8], s.extraSize)
-
- header = append(header, extraData...)
- }
-
- return header
-}
-
-func (s *SuperBlock) Initialized() bool {
- return s.ReplicaPlacement != nil && s.Ttl != nil
-}
-
func (v *Volume) maybeWriteSuperBlock() error {
datSize, _, e := v.DataBackend.GetStat()
if e != nil {
- glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.String(), e)
+ glog.V(0).Infof("failed to stat datafile %s: %v", v.DataBackend.Name(), e)
return e
}
if datSize == 0 {
- v.SuperBlock.version = needle.CurrentVersion
+ v.SuperBlock.Version = needle.CurrentVersion
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
var dataFile *os.File
- if dataFile, e = os.Create(v.DataBackend.String()); e == nil {
+ if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
- v.readOnly = false
+ v.noWriteOrDelete = false
+ v.noWriteCanDelete = false
}
}
}
@@ -99,38 +36,13 @@ func (v *Volume) maybeWriteSuperBlock() error {
}
func (v *Volume) readSuperBlock() (err error) {
- v.SuperBlock, err = ReadSuperBlock(v.DataBackend)
- return err
-}
-
-// ReadSuperBlock reads from data file and load it into volume's super block
-func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
-
- header := make([]byte, _SuperBlockSize)
- if _, e := datBackend.ReadAt(header, 0); e != nil {
- err = fmt.Errorf("cannot read volume %s super block: %v", datBackend.String(), e)
- return
- }
-
- superBlock.version = needle.Version(header[0])
- if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil {
- err = fmt.Errorf("cannot read replica type: %s", err.Error())
- return
- }
- superBlock.Ttl = needle.LoadTTLFromBytes(header[2:4])
- superBlock.CompactionRevision = util.BytesToUint16(header[4:6])
- superBlock.extraSize = util.BytesToUint16(header[6:8])
-
- if superBlock.extraSize > 0 {
- // read more
- extraData := make([]byte, int(superBlock.extraSize))
- superBlock.Extra = &master_pb.SuperBlockExtra{}
- err = proto.Unmarshal(extraData, superBlock.Extra)
- if err != nil {
- err = fmt.Errorf("cannot read volume %s super block extra: %v", datBackend.String(), err)
- return
+ v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend)
+ if v.volumeInfo != nil && v.volumeInfo.Replication != "" {
+ if replication, err := super_block.NewReplicaPlacementFromString(v.volumeInfo.Replication); err != nil {
+ return fmt.Errorf("Error parse volume %d replication %s : %v", v.Id, v.volumeInfo.Replication, err)
+ } else {
+ v.SuperBlock.ReplicaPlacement = replication
}
}
-
- return
+ return err
}
diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go
new file mode 100644
index 000000000..fd7b08654
--- /dev/null
+++ b/weed/storage/volume_tier.go
@@ -0,0 +1,50 @@
+package storage
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
+)
+
+func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
+ return v.volumeInfo
+}
+
+func (v *Volume) maybeLoadVolumeInfo() (found bool) {
+
+ v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
+
+ if v.hasRemoteFile {
+ glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
+ v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
+ }
+
+ return
+
+}
+
+func (v *Volume) HasRemoteFile() bool {
+ return v.hasRemoteFile
+}
+
+func (v *Volume) LoadRemoteFile() error {
+ tierFile := v.volumeInfo.GetFiles()[0]
+ backendStorage := backend.BackendStorages[tierFile.BackendName()]
+
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ }
+
+ v.DataBackend = backendStorage.NewStorageFile(tierFile.Key, v.volumeInfo)
+ return nil
+}
+
+func (v *Volume) SaveVolumeInfo() error {
+
+ tierFileName := v.FileName() + ".vif"
+
+ return pb.SaveVolumeInfo(tierFileName, v.volumeInfo)
+
+}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index e90746b54..5d0d63877 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -3,6 +3,7 @@ package storage
import (
"fmt"
"os"
+ "runtime"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -11,6 +12,7 @@ import (
idx2 "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -19,101 +21,124 @@ func (v *Volume) garbageLevel() float64 {
if v.ContentSize() == 0 {
return 0
}
- return float64(v.DeletedSize()) / float64(v.ContentSize())
+ deletedSize := v.DeletedSize()
+ fileSize := v.ContentSize()
+ if v.DeletedCount() > 0 && v.DeletedSize() == 0 {
+ // this happens for .sdx converted back to normal .idx
+ // where deleted entry size is missing
+ datFileSize, _, _ := v.FileStat()
+ deletedSize = datFileSize - fileSize - super_block.SuperBlockSize
+ fileSize = datFileSize
+ }
+ return float64(deletedSize) / float64(fileSize)
}
+// compact a volume based on deletions in .dat files
func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error {
- if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
- glog.V(3).Infof("Compacting volume %d ...", v.Id)
- //no need to lock for copy on write
- //v.accessLock.Lock()
- //defer v.accessLock.Unlock()
- //glog.V(3).Infof("Got Compaction lock...")
- v.isCompacting = true
- defer func() {
- v.isCompacting = false
- }()
-
- filePath := v.FileName()
- v.lastCompactIndexOffset = v.IndexFileSize()
- v.lastCompactRevision = v.SuperBlock.CompactionRevision
- glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
- } else {
+ if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
}
+ glog.V(3).Infof("Compacting volume %d ...", v.Id)
+ //no need to lock for copy on write
+ //v.accessLock.Lock()
+ //defer v.accessLock.Unlock()
+ //glog.V(3).Infof("Got Compaction lock...")
+ v.isCompacting = true
+ defer func() {
+ v.isCompacting = false
+ }()
+
+ filePath := v.FileName()
+ v.lastCompactIndexOffset = v.IndexFileSize()
+ v.lastCompactRevision = v.SuperBlock.CompactionRevision
+ glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
+ return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
}
-func (v *Volume) Compact2() error {
-
- if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
- glog.V(3).Infof("Compact2 volume %d ...", v.Id)
+// compact a volume based on deletions in .idx files
+func (v *Volume) Compact2(preallocate int64) error {
- v.isCompacting = true
- defer func() {
- v.isCompacting = false
- }()
-
- filePath := v.FileName()
- glog.V(3).Infof("creating copies for volume %d ...", v.Id)
- return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx")
- } else {
+ if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
}
+ glog.V(3).Infof("Compact2 volume %d ...", v.Id)
+
+ v.isCompacting = true
+ defer func() {
+ v.isCompacting = false
+ }()
+
+ filePath := v.FileName()
+ v.lastCompactIndexOffset = v.IndexFileSize()
+ v.lastCompactRevision = v.SuperBlock.CompactionRevision
+ glog.V(3).Infof("creating copies for volume %d ...", v.Id)
+ return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate)
}
func (v *Volume) CommitCompact() error {
- if v.MemoryMapMaxSizeMb == 0 { //it makes no sense to compact in memory
- glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
+ if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
+ return nil
+ }
+ glog.V(0).Infof("Committing volume %d vacuuming...", v.Id)
- v.isCompacting = true
- defer func() {
- v.isCompacting = false
- }()
+ v.isCompacting = true
+ defer func() {
+ v.isCompacting = false
+ }()
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
- glog.V(3).Infof("Got volume %d committing lock...", v.Id)
- v.nm.Close()
+ glog.V(3).Infof("Got volume %d committing lock...", v.Id)
+ v.nm.Close()
+ if v.DataBackend != nil {
if err := v.DataBackend.Close(); err != nil {
glog.V(0).Infof("fail to close volume %d", v.Id)
}
- v.DataBackend = nil
- stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
-
- var e error
- if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
- glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
- e = os.Remove(v.FileName() + ".cpd")
+ }
+ v.DataBackend = nil
+ stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
+
+ var e error
+ if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
+ glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
+ e = os.Remove(v.FileName() + ".cpd")
+ if e != nil {
+ return e
+ }
+ e = os.Remove(v.FileName() + ".cpx")
+ if e != nil {
+ return e
+ }
+ } else {
+ if runtime.GOOS == "windows" {
+ e = os.RemoveAll(v.FileName() + ".dat")
if e != nil {
return e
}
- e = os.Remove(v.FileName() + ".cpx")
+ e = os.RemoveAll(v.FileName() + ".idx")
if e != nil {
return e
}
- } else {
- var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
- }
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
- }
}
+ var e error
+ if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
+ }
+ if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
+ }
+ }
- //glog.V(3).Infof("Pretending to be vacuuming...")
- //time.Sleep(20 * time.Second)
+ //glog.V(3).Infof("Pretending to be vacuuming...")
+ //time.Sleep(20 * time.Second)
- os.RemoveAll(v.FileName() + ".ldb")
- os.RemoveAll(v.FileName() + ".bdb")
+ os.RemoveAll(v.FileName() + ".ldb")
- glog.V(3).Infof("Loading volume %d commit file...", v.Id)
- if e = v.load(true, false, v.needleMapKind, 0); e != nil {
- return e
- }
+ glog.V(3).Infof("Loading volume %d commit file...", v.Id)
+ if e = v.load(true, false, v.needleMapKind, 0); e != nil {
+ return e
}
return nil
}
@@ -132,14 +157,15 @@ func (v *Volume) cleanupCompact() error {
return nil
}
-func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) {
- superBlock, err := ReadSuperBlock(datBackend)
+func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (compactRevision uint16, err error) {
+ superBlock, err := super_block.ReadSuperBlock(datBackend)
if err != nil {
return 0, err
}
return superBlock.CompactionRevision, nil
}
+// if old .dat and .idx files are updated, this func tries to apply the same changes to new files accordingly
func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldIdxFileName string) (err error) {
var indexSize int64
@@ -150,6 +176,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
oldDatBackend := backend.NewDiskFile(oldDatFile)
defer oldDatBackend.Close()
+ // skip if the old .idx file has not changed
if indexSize, err = verifyIndexFileIntegrity(oldIdxFile); err != nil {
return fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", oldIdxFileName, err)
}
@@ -157,6 +184,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
return nil
}
+ // fail if the old .dat file has changed to a new revision
oldDatCompactRevision, err := fetchCompactRevisionFromDatFile(oldDatBackend)
if err != nil {
return fmt.Errorf("fetchCompactRevisionFromDatFile src %s failed: %v", oldDatFile.Name(), err)
@@ -270,15 +298,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
- dstBackend backend.DataStorageBackend
- nm *NeedleMap
+ dstBackend backend.BackendStorageFile
+ nm *needle_map.MemDb
newOffset int64
now uint64
writeThrottler *util.WriteThrottler
}
-func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock SuperBlock) error {
- scanner.version = superBlock.Version()
+func (scanner *VolumeFileScanner4Vacuum) VisitSuperBlock(superBlock super_block.SuperBlock) error {
+ scanner.version = superBlock.Version
superBlock.CompactionRevision++
_, err := scanner.dstBackend.WriteAt(superBlock.Bytes(), 0)
scanner.newOffset = int64(superBlock.BlockSize())
@@ -296,7 +324,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
- if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
+ if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
if _, _, _, err := n.Append(scanner.dstBackend, scanner.v.Version()); err != nil {
@@ -312,90 +340,92 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dst backend.DataStorageBackend
- idx *os.File
+ dst backend.BackendStorageFile
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
- return
- }
- defer idx.Close()
+ nm := needle_map.NewMemDb()
+ defer nm.Close()
scanner := &VolumeFileScanner4Vacuum{
v: v,
now: uint64(time.Now().Unix()),
- nm: NewBtreeNeedleMap(idx),
+ nm: nm,
dstBackend: dst,
writeThrottler: util.NewWriteThrottler(compactionBytePerSecond),
}
err = ScanVolumeFile(v.dir, v.Collection, v.Id, v.needleMapKind, scanner)
+ if err != nil {
+ return nil
+ }
+
+ err = nm.SaveToIdx(idxName)
return
}
-func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) {
+func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64) (err error) {
var (
- dst, idx, oldIndexFile *os.File
+ srcDatBackend, dstDatBackend backend.BackendStorageFile
+ dataFile *os.File
)
- if dst, err = os.OpenFile(dstName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
+ if dstDatBackend, err = createVolumeFile(dstDatName, preallocate, 0); err != nil {
return
}
- dstDatBackend := backend.NewDiskFile(dst)
defer dstDatBackend.Close()
- if idx, err = os.OpenFile(idxName, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644); err != nil {
+ oldNm := needle_map.NewMemDb()
+ defer oldNm.Close()
+ newNm := needle_map.NewMemDb()
+ defer newNm.Close()
+ if err = oldNm.LoadFromIdx(srcIdxName); err != nil {
return
}
- defer idx.Close()
-
- if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
- return
+ if dataFile, err = os.Open(srcDatName); err != nil {
+ return err
}
- defer oldIndexFile.Close()
+ srcDatBackend = backend.NewDiskFile(dataFile)
+ defer srcDatBackend.Close()
- nm := NewBtreeNeedleMap(idx)
now := uint64(time.Now().Unix())
- v.SuperBlock.CompactionRevision++
- dst.Write(v.SuperBlock.Bytes())
- newOffset := int64(v.SuperBlock.BlockSize())
+ sb.CompactionRevision++
+ dstDatBackend.WriteAt(sb.Bytes(), 0)
+ newOffset := int64(sb.BlockSize())
- idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
- if offset.IsZero() || size == TombstoneFileSize {
- return nil
- }
+ oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
+
+ offset, size := value.Offset, value.Size
- nv, ok := v.nm.Get(key)
- if !ok {
+ if offset.IsZero() || size == TombstoneFileSize {
return nil
}
n := new(needle.Needle)
- err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version())
+ err := n.ReadData(srcDatBackend, offset.ToAcutalOffset(), size, version)
if err != nil {
return nil
}
- if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
return nil
}
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if nv.Offset == offset && nv.Size > 0 {
- if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
- }
- if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
- }
- newOffset += n.DiskSize(v.Version())
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+ if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
+ }
+ if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
}
+ newOffset += n.DiskSize(version)
+ glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+
return nil
})
+ newNm.SaveToIdx(datIdxName)
+
return
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index ba1e59f2c..95f43d6ec 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -8,6 +8,7 @@ import (
"time"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -46,7 +47,7 @@ func TestMakeDiff(t *testing.T) {
v := new(Volume)
//lastCompactIndexOffset value is the index file size before step 4
v.lastCompactIndexOffset = 96
- v.SuperBlock.version = 0x2
+ v.SuperBlock.Version = 0x2
/*
err := v.makeupDiff(
"/yourpath/1.cpd",
@@ -68,7 +69,7 @@ func TestCompaction(t *testing.T) {
}
defer os.RemoveAll(dir) // clean up
- v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -83,7 +84,7 @@ func TestCompaction(t *testing.T) {
}
startTime := time.Now()
- v.Compact(0, 1024*1024)
+ v.Compact2(0)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed)