aboutsummaryrefslogtreecommitdiff
path: root/weed/storage
diff options
context:
space:
mode:
authorbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
committerbingoohuang <bingoo.huang@gmail.com>2021-04-26 17:19:35 +0800
commitd861cbd81b75b6684c971ac00e33685e6575b833 (patch)
tree301805fef4aa5d0096bfb1510536f7a009b661e7 /weed/storage
parent70da715d8d917527291b35fb069fac077d17b868 (diff)
parent4ee58922eff61a5a4ca29c0b4829b097a498549e (diff)
downloadseaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.tar.xz
seaweedfs-d861cbd81b75b6684c971ac00e33685e6575b833.zip
Merge branch 'master' of https://github.com/bingoohuang/seaweedfs
Diffstat (limited to 'weed/storage')
-rw-r--r--weed/storage/backend/backend.go25
-rw-r--r--weed/storage/backend/disk_file.go46
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go8
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go24
-rw-r--r--weed/storage/backend/s3_backend/s3_sessions.go6
-rw-r--r--weed/storage/backend/volume_create.go20
-rw-r--r--weed/storage/backend/volume_create_linux.go (renamed from weed/storage/volume_create_linux.go)9
-rw-r--r--weed/storage/backend/volume_create_windows.go (renamed from weed/storage/volume_create_windows.go)7
-rw-r--r--weed/storage/disk_location.go224
-rw-r--r--weed/storage/disk_location_ec.go24
-rw-r--r--weed/storage/erasure_coding/389.ecxbin0 -> 7761568 bytes
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go20
-rw-r--r--weed/storage/erasure_coding/ec_encoder.go24
-rw-r--r--weed/storage/erasure_coding/ec_locate.go10
-rw-r--r--weed/storage/erasure_coding/ec_shard.go14
-rw-r--r--weed/storage/erasure_coding/ec_test.go22
-rw-r--r--weed/storage/erasure_coding/ec_volume.go62
-rw-r--r--weed/storage/erasure_coding/ec_volume_delete.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume_info.go6
-rw-r--r--weed/storage/erasure_coding/ec_volume_test.go54
-rw-r--r--weed/storage/idx/walk.go17
-rw-r--r--weed/storage/needle/async_request.go53
-rw-r--r--weed/storage/needle/crc.go25
-rw-r--r--weed/storage/needle/file_id.go2
-rw-r--r--weed/storage/needle/needle.go69
-rw-r--r--weed/storage/needle/needle_parse_multipart.go109
-rw-r--r--weed/storage/needle/needle_parse_upload.go201
-rw-r--r--weed/storage/needle/needle_read_write.go101
-rw-r--r--weed/storage/needle/needle_read_write_test.go2
-rw-r--r--weed/storage/needle/volume_ttl.go48
-rw-r--r--weed/storage/needle/volume_ttl_test.go7
-rw-r--r--weed/storage/needle_map.go42
-rw-r--r--weed/storage/needle_map/compact_map.go27
-rw-r--r--weed/storage/needle_map/compact_map_perf_test.go5
-rw-r--r--weed/storage/needle_map/compact_map_test.go20
-rw-r--r--weed/storage/needle_map/memdb.go27
-rw-r--r--weed/storage/needle_map/memdb_test.go23
-rw-r--r--weed/storage/needle_map/needle_value.go6
-rw-r--r--weed/storage/needle_map/needle_value_map.go4
-rw-r--r--weed/storage/needle_map_leveldb.go61
-rw-r--r--weed/storage/needle_map_memory.go17
-rw-r--r--weed/storage/needle_map_metric.go18
-rw-r--r--weed/storage/needle_map_metric_test.go2
-rw-r--r--weed/storage/needle_map_sorted_file.go22
-rw-r--r--weed/storage/store.go302
-rw-r--r--weed/storage/store_ec.go52
-rw-r--r--weed/storage/store_ec_delete.go20
-rw-r--r--weed/storage/store_vacuum.go7
-rw-r--r--weed/storage/super_block/replica_placement.go9
-rw-r--r--weed/storage/types/needle_types.go23
-rw-r--r--weed/storage/types/offset_4bytes.go6
-rw-r--r--weed/storage/types/offset_5bytes.go6
-rw-r--r--weed/storage/types/volume_disk_type.go40
-rw-r--r--weed/storage/volume.go106
-rw-r--r--weed/storage/volume_backup.go50
-rw-r--r--weed/storage/volume_checking.go127
-rw-r--r--weed/storage/volume_create.go21
-rw-r--r--weed/storage/volume_info.go4
-rw-r--r--weed/storage/volume_loading.go110
-rw-r--r--weed/storage/volume_read.go131
-rw-r--r--weed/storage/volume_read_write.go237
-rw-r--r--weed/storage/volume_stream_write.go104
-rw-r--r--weed/storage/volume_super_block.go10
-rw-r--r--weed/storage/volume_tier.go18
-rw-r--r--weed/storage/volume_vacuum.go147
-rw-r--r--weed/storage/volume_vacuum_test.go16
-rw-r--r--weed/storage/volume_write.go327
67 files changed, 2394 insertions, 994 deletions
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index 6ea850543..2dc61d02e 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -1,6 +1,7 @@
package backend
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"io"
"os"
"strings"
@@ -9,7 +10,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/spf13/viper"
)
type BackendStorageFile interface {
@@ -19,6 +19,7 @@ type BackendStorageFile interface {
io.Closer
GetStat() (datSize int64, modTime time.Time, err error)
Name() string
+ Sync() error
}
type BackendStorage interface {
@@ -35,7 +36,7 @@ type StringProperties interface {
type StorageType string
type BackendStorageFactory interface {
StorageType() StorageType
- BuildStorage(configuration StringProperties, id string) (BackendStorage, error)
+ BuildStorage(configuration StringProperties, configPrefix string, id string) (BackendStorage, error)
}
var (
@@ -44,23 +45,24 @@ var (
)
// used by master to load remote storage configurations
-func LoadConfiguration(config *viper.Viper) {
+func LoadConfiguration(config *util.ViperProxy) {
StorageBackendPrefix := "storage.backend"
- backendSub := config.Sub(StorageBackendPrefix)
-
for backendTypeName := range config.GetStringMap(StorageBackendPrefix) {
backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)]
if !found {
glog.Fatalf("backend storage type %s not found", backendTypeName)
}
- backendTypeSub := backendSub.Sub(backendTypeName)
- for backendStorageId := range backendSub.GetStringMap(backendTypeName) {
- if !backendTypeSub.GetBool(backendStorageId + ".enabled") {
+ for backendStorageId := range config.GetStringMap(StorageBackendPrefix + "." + backendTypeName) {
+ if !config.GetBool(StorageBackendPrefix + "." + backendTypeName + "." + backendStorageId + ".enabled") {
+ continue
+ }
+ if _, found := BackendStorages[backendTypeName+"."+backendStorageId]; found {
continue
}
- backendStorage, buildErr := backendStorageFactory.BuildStorage(backendTypeSub.Sub(backendStorageId), backendStorageId)
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(config,
+ StorageBackendPrefix+"."+backendTypeName+"."+backendStorageId+".", backendStorageId)
if buildErr != nil {
glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId)
}
@@ -82,7 +84,10 @@ func LoadFromPbStorageBackends(storageBackends []*master_pb.StorageBackend) {
glog.Warningf("storage type %s not found", storageBackend.Type)
continue
}
- backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), storageBackend.Id)
+ if _, found := BackendStorages[storageBackend.Type+"."+storageBackend.Id]; found {
+ continue
+ }
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(newProperties(storageBackend.Properties), "", storageBackend.Id)
if buildErr != nil {
glog.Fatalf("fail to create backend storage %s.%s", storageBackend.Type, storageBackend.Id)
}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index c4b3caffb..3b42429cf 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -1,6 +1,8 @@
package backend
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"time"
)
@@ -12,12 +14,25 @@ var (
type DiskFile struct {
File *os.File
fullFilePath string
+ fileSize int64
+ modTime time.Time
}
func NewDiskFile(f *os.File) *DiskFile {
+ stat, err := f.Stat()
+ if err != nil {
+ glog.Fatalf("stat file %s: %v", f.Name(), err)
+ }
+ offset := stat.Size()
+ if offset%NeedlePaddingSize != 0 {
+ offset = offset + (NeedlePaddingSize - offset%NeedlePaddingSize)
+ }
+
return &DiskFile{
fullFilePath: f.Name(),
File: f,
+ fileSize: offset,
+ modTime: stat.ModTime(),
}
}
@@ -26,11 +41,28 @@ func (df *DiskFile) ReadAt(p []byte, off int64) (n int, err error) {
}
func (df *DiskFile) WriteAt(p []byte, off int64) (n int, err error) {
- return df.File.WriteAt(p, off)
+ n, err = df.File.WriteAt(p, off)
+ if err == nil {
+ waterMark := off + int64(n)
+ if waterMark > df.fileSize {
+ df.fileSize = waterMark
+ df.modTime = time.Now()
+ }
+ }
+ return
+}
+
+func (df *DiskFile) Write(p []byte) (n int, err error) {
+ return df.WriteAt(p, df.fileSize)
}
func (df *DiskFile) Truncate(off int64) error {
- return df.File.Truncate(off)
+ err := df.File.Truncate(off)
+ if err == nil {
+ df.fileSize = off
+ df.modTime = time.Now()
+ }
+ return err
}
func (df *DiskFile) Close() error {
@@ -38,13 +70,13 @@ func (df *DiskFile) Close() error {
}
func (df *DiskFile) GetStat() (datSize int64, modTime time.Time, err error) {
- stat, e := df.File.Stat()
- if e == nil {
- return stat.Size(), stat.ModTime(), nil
- }
- return 0, time.Time{}, err
+ return df.fileSize, df.modTime, nil
}
func (df *DiskFile) Name() string {
return df.fullFilePath
}
+
+func (df *DiskFile) Sync() error {
+ return df.File.Sync()
+}
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
index 03e7308d0..8ff03d9af 100644
--- a/weed/storage/backend/memory_map/memory_map_backend.go
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -3,12 +3,10 @@ package memory_map
import (
"os"
"time"
-
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
var (
- _ backend.BackendStorageFile = &MemoryMappedFile{}
+// _ backend.BackendStorageFile = &MemoryMappedFile{} // remove this to break import cycle
)
type MemoryMappedFile struct {
@@ -58,3 +56,7 @@ func (mmf *MemoryMappedFile) GetStat() (datSize int64, modTime time.Time, err er
func (mmf *MemoryMappedFile) Name() string {
return mmf.mm.File.Name()
}
+
+func (mm *MemoryMappedFile) Sync() error {
+ return nil
+}
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 9f03cfa81..4706c9334 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -26,8 +26,8 @@ type S3BackendFactory struct {
func (factory *S3BackendFactory) StorageType() backend.StorageType {
return backend.StorageType("s3")
}
-func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, id string) (backend.BackendStorage, error) {
- return newS3BackendStorage(configuration, id)
+func (factory *S3BackendFactory) BuildStorage(configuration backend.StringProperties, configPrefix string, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, configPrefix, id)
}
type S3BackendStorage struct {
@@ -36,17 +36,20 @@ type S3BackendStorage struct {
aws_secret_access_key string
region string
bucket string
+ endpoint string
conn s3iface.S3API
}
-func newS3BackendStorage(configuration backend.StringProperties, id string) (s *S3BackendStorage, err error) {
+func newS3BackendStorage(configuration backend.StringProperties, configPrefix string, id string) (s *S3BackendStorage, err error) {
s = &S3BackendStorage{}
s.id = id
- s.aws_access_key_id = configuration.GetString("aws_access_key_id")
- s.aws_secret_access_key = configuration.GetString("aws_secret_access_key")
- s.region = configuration.GetString("region")
- s.bucket = configuration.GetString("bucket")
- s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region)
+ s.aws_access_key_id = configuration.GetString(configPrefix + "aws_access_key_id")
+ s.aws_secret_access_key = configuration.GetString(configPrefix + "aws_secret_access_key")
+ s.region = configuration.GetString(configPrefix + "region")
+ s.bucket = configuration.GetString(configPrefix + "bucket")
+ s.endpoint = configuration.GetString(configPrefix + "endpoint")
+
+ s.conn, err = createSession(s.aws_access_key_id, s.aws_secret_access_key, s.region, s.endpoint)
glog.V(0).Infof("created backend storage s3.%s for region %s bucket %s", s.id, s.region, s.bucket)
return
@@ -58,6 +61,7 @@ func (s *S3BackendStorage) ToProperties() map[string]string {
m["aws_secret_access_key"] = s.aws_secret_access_key
m["region"] = s.region
m["bucket"] = s.bucket
+ m["endpoint"] = s.endpoint
return m
}
@@ -175,3 +179,7 @@ func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTi
func (s3backendStorageFile S3BackendStorageFile) Name() string {
return s3backendStorageFile.key
}
+
+func (s3backendStorageFile S3BackendStorageFile) Sync() error {
+ return nil
+}
diff --git a/weed/storage/backend/s3_backend/s3_sessions.go b/weed/storage/backend/s3_backend/s3_sessions.go
index 5fdbcb66b..b8378c379 100644
--- a/weed/storage/backend/s3_backend/s3_sessions.go
+++ b/weed/storage/backend/s3_backend/s3_sessions.go
@@ -24,7 +24,7 @@ func getSession(region string) (s3iface.S3API, bool) {
return sess, found
}
-func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S3API, error) {
+func createSession(awsAccessKeyId, awsSecretAccessKey, region, endpoint string) (s3iface.S3API, error) {
sessionsLock.Lock()
defer sessionsLock.Unlock()
@@ -34,7 +34,9 @@ func createSession(awsAccessKeyId, awsSecretAccessKey, region string) (s3iface.S
}
config := &aws.Config{
- Region: aws.String(region),
+ Region: aws.String(region),
+ Endpoint: aws.String(endpoint),
+ S3ForcePathStyle: aws.Bool(true),
}
if awsAccessKeyId != "" && awsSecretAccessKey != "" {
config.Credentials = credentials.NewStaticCredentials(awsAccessKeyId, awsSecretAccessKey, "")
diff --git a/weed/storage/backend/volume_create.go b/weed/storage/backend/volume_create.go
new file mode 100644
index 000000000..d4bd8e40f
--- /dev/null
+++ b/weed/storage/backend/volume_create.go
@@ -0,0 +1,20 @@
+// +build !linux,!windows
+
+package backend
+
+import (
+ "os"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) {
+ file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
+ if e != nil {
+ return nil, e
+ }
+ if preallocate > 0 {
+ glog.V(2).Infof("Preallocated disk space for %s is not supported", fileName)
+ }
+ return NewDiskFile(file), nil
+}
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/backend/volume_create_linux.go
index ee599ac32..260c2c2a3 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/backend/volume_create_linux.go
@@ -1,23 +1,22 @@
// +build linux
-package storage
+package backend
import (
"os"
"syscall"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
+func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if e != nil {
return nil, e
}
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
- glog.V(0).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
+ glog.V(1).Infof("Preallocated %d bytes disk space for %s", preallocate, fileName)
}
- return backend.NewDiskFile(file), nil
+ return NewDiskFile(file), nil
}
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/backend/volume_create_windows.go
index e1c0b961f..7d40ec0d7 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/backend/volume_create_windows.go
@@ -1,17 +1,16 @@
// +build windows
-package storage
+package backend
import (
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map"
"golang.org/x/sys/windows"
"github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
+func CreateVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (BackendStorageFile, error) {
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
}
@@ -27,7 +26,7 @@ func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32
if e != nil {
return nil, e
}
- return backend.NewDiskFile(file), nil
+ return NewDiskFile(file), nil
}
}
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index e116fc715..ed4e00312 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -1,45 +1,68 @@
package storage
import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io/ioutil"
"os"
+ "path/filepath"
"strings"
"sync"
-
- "fmt"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type DiskLocation struct {
- Directory string
- MaxVolumeCount int
- volumes map[needle.VolumeId]*Volume
- volumesLock sync.RWMutex
+ Directory string
+ IdxDirectory string
+ DiskType types.DiskType
+ MaxVolumeCount int
+ OriginalMaxVolumeCount int
+ MinFreeSpacePercent float32
+ volumes map[needle.VolumeId]*Volume
+ volumesLock sync.RWMutex
// erasure coding
ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
ecVolumesLock sync.RWMutex
+
+ isDiskSpaceLow bool
}
-func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
- location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
+func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32, idxDir string, diskType types.DiskType) *DiskLocation {
+ dir = util.ResolvePath(dir)
+ if idxDir == "" {
+ idxDir = dir
+ } else {
+ idxDir = util.ResolvePath(idxDir)
+ }
+ location := &DiskLocation{
+ Directory: dir,
+ IdxDirectory: idxDir,
+ DiskType: diskType,
+ MaxVolumeCount: maxVolumeCount,
+ OriginalMaxVolumeCount: maxVolumeCount,
+ MinFreeSpacePercent: minFreeSpacePercent,
+ }
location.volumes = make(map[needle.VolumeId]*Volume)
location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
+ go location.CheckDiskSpace()
return location
}
-func (l *DiskLocation) volumeIdFromPath(dir os.FileInfo) (needle.VolumeId, string, error) {
- name := dir.Name()
- if !dir.IsDir() && strings.HasSuffix(name, ".idx") {
- base := name[:len(name)-len(".idx")]
+func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
+ if isValidVolume(filename) {
+ base := filename[:len(filename)-4]
collection, volumeId, err := parseCollectionVolumeId(base)
return volumeId, collection, err
}
- return 0, "", fmt.Errorf("Path is not a volume: %s", name)
+ return 0, "", fmt.Errorf("file is not a volume: %s", filename)
}
func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeId, err error) {
@@ -51,38 +74,83 @@ func parseCollectionVolumeId(base string) (collection string, vid needle.VolumeI
return collection, vol, err
}
-func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapType) {
- name := fileInfo.Name()
- if !fileInfo.IsDir() && strings.HasSuffix(name, ".idx") {
- vid, collection, err := l.volumeIdFromPath(fileInfo)
- if err == nil {
- l.volumesLock.RLock()
- _, found := l.volumes[vid]
- l.volumesLock.RUnlock()
- if !found {
- if v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0); e == nil {
- l.volumesLock.Lock()
- l.volumes[vid] = v
- l.volumesLock.Unlock()
- size, _, _ := v.FileStat()
- glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s",
- l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
- // println("volume", vid, "last append at", v.lastAppendAtNs)
- } else {
- glog.V(0).Infof("new volume %s error %s", name, e)
- }
- }
- }
+func isValidVolume(basename string) bool {
+ return strings.HasSuffix(basename, ".idx") || strings.HasSuffix(basename, ".vif")
+}
+
+func getValidVolumeName(basename string) string {
+ if isValidVolume(basename) {
+ return basename[:len(basename)-4]
}
+ return ""
}
-func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, concurrency int) {
+func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind NeedleMapKind) bool {
+ basename := fileInfo.Name()
+ if fileInfo.IsDir() {
+ return false
+ }
+ volumeName := getValidVolumeName(basename)
+ if volumeName == "" {
+ return false
+ }
+
+ // check for incomplete volume
+ noteFile := l.Directory + "/" + volumeName + ".note"
+ if util.FileExists(noteFile) {
+ note, _ := ioutil.ReadFile(noteFile)
+ glog.Warningf("volume %s was not completed: %s", volumeName, string(note))
+ removeVolumeFiles(l.Directory + "/" + volumeName)
+ removeVolumeFiles(l.IdxDirectory + "/" + volumeName)
+ return false
+ }
+
+ // parse out collection, volume id
+ vid, collection, err := volumeIdFromFileName(basename)
+ if err != nil {
+ glog.Warningf("get volume id failed, %s, err : %s", volumeName, err)
+ return false
+ }
+
+ // avoid loading one volume more than once
+ l.volumesLock.RLock()
+ _, found := l.volumes[vid]
+ l.volumesLock.RUnlock()
+ if found {
+ glog.V(1).Infof("loaded volume, %v", vid)
+ return true
+ }
+
+ // load the volume
+ v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
+ if e != nil {
+ glog.V(0).Infof("new volume %s error %s", volumeName, e)
+ return false
+ }
+
+ l.SetVolume(vid, v)
+
+ size, _, _ := v.FileStat()
+ glog.V(0).Infof("data file %s, replication=%s v=%d size=%d ttl=%s",
+ l.Directory+"/"+volumeName+".dat", v.ReplicaPlacement, v.Version(), size, v.Ttl.String())
+ return true
+}
+
+func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, concurrency int) {
task_queue := make(chan os.FileInfo, 10*concurrency)
go func() {
- if dirs, err := ioutil.ReadDir(l.Directory); err == nil {
- for _, dir := range dirs {
- task_queue <- dir
+ foundVolumeNames := make(map[string]bool)
+ if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
+ for _, fi := range fileInfos {
+ volumeName := getValidVolumeName(fi.Name())
+ if volumeName == "" {
+ continue
+ }
+ if _, found := foundVolumeNames[volumeName]; !found {
+ foundVolumeNames[volumeName] = true
+ task_queue <- fi
+ }
}
}
close(task_queue)
@@ -93,8 +161,8 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
wg.Add(1)
go func() {
defer wg.Done()
- for dir := range task_queue {
- l.loadExistingVolume(dir, needleMapKind)
+ for fi := range task_queue {
+ _ = l.loadExistingVolume(fi, needleMapKind)
}
}()
}
@@ -102,7 +170,7 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapType, con
}
-func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
+func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
l.concurrentLoadingVolumes(needleMapKind, 10)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
@@ -158,7 +226,7 @@ func (l *DiskLocation) DeleteCollectionFromDiskLocation(collection string) (e er
return
}
-func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
+func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (found bool, e error) {
v, ok := l.volumes[vid]
if !ok {
return
@@ -167,21 +235,15 @@ func (l *DiskLocation) deleteVolumeById(vid needle.VolumeId) (e error) {
if e != nil {
return
}
+ found = true
delete(l.volumes, vid)
return
}
-func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapType) bool {
- if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
- for _, fileInfo := range fileInfos {
- volId, _, err := l.volumeIdFromPath(fileInfo)
- if vid == volId && err == nil {
- l.loadExistingVolume(fileInfo, needleMapKind)
- return true
- }
- }
+func (l *DiskLocation) LoadVolume(vid needle.VolumeId, needleMapKind NeedleMapKind) bool {
+ if fileInfo, found := l.LocateVolume(vid); found {
+ return l.loadExistingVolume(fileInfo, needleMapKind)
}
-
return false
}
@@ -193,7 +255,8 @@ func (l *DiskLocation) DeleteVolume(vid needle.VolumeId) error {
if !ok {
return fmt.Errorf("Volume not found, VolumeId: %d", vid)
}
- return l.deleteVolumeById(vid)
+ _, err := l.deleteVolumeById(vid)
+ return err
}
func (l *DiskLocation) UnloadVolume(vid needle.VolumeId) error {
@@ -217,7 +280,7 @@ func (l *DiskLocation) unmountVolumeByCollection(collectionName string) map[need
}
}
- for k, _ := range deltaVols {
+ for k := range deltaVols {
delete(l.volumes, k)
}
return deltaVols
@@ -228,6 +291,7 @@ func (l *DiskLocation) SetVolume(vid needle.VolumeId, volume *Volume) {
defer l.volumesLock.Unlock()
l.volumes[vid] = volume
+ volume.location = l
}
func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
@@ -260,3 +324,53 @@ func (l *DiskLocation) Close() {
return
}
+
+func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.FileInfo, bool) {
+ if fileInfos, err := ioutil.ReadDir(l.Directory); err == nil {
+ for _, fileInfo := range fileInfos {
+ volId, _, err := volumeIdFromFileName(fileInfo.Name())
+ if vid == volId && err == nil {
+ return fileInfo, true
+ }
+ }
+ }
+
+ return nil, false
+}
+
+func (l *DiskLocation) UnUsedSpace(volumeSizeLimit uint64) (unUsedSpace uint64) {
+
+ l.volumesLock.RLock()
+ defer l.volumesLock.RUnlock()
+
+ for _, vol := range l.volumes {
+ if vol.IsReadOnly() {
+ continue
+ }
+ datSize, idxSize, _ := vol.FileStat()
+ unUsedSpace += volumeSizeLimit - (datSize + idxSize)
+ }
+
+ return
+}
+
+func (l *DiskLocation) CheckDiskSpace() {
+ for {
+ if dir, e := filepath.Abs(l.Directory); e == nil {
+ s := stats.NewDiskStatus(dir)
+ stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "all").Set(float64(s.All))
+ stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "used").Set(float64(s.Used))
+ stats.VolumeServerResourceGauge.WithLabelValues(l.Directory, "free").Set(float64(s.Free))
+ if (s.PercentFree < l.MinFreeSpacePercent) != l.isDiskSpaceLow {
+ l.isDiskSpaceLow = !l.isDiskSpaceLow
+ }
+ if l.isDiskSpaceLow {
+ glog.V(0).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow)
+ } else {
+ glog.V(4).Infof("dir %s freePercent %.2f%% < min %.2f%%, isLowDiskSpace: %v", dir, s.PercentFree, l.MinFreeSpacePercent, l.isDiskSpaceLow)
+ }
+ }
+ time.Sleep(time.Minute)
+ }
+
+}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index f6c44e966..91c7d86a6 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -3,6 +3,7 @@ package storage
import (
"fmt"
"io/ioutil"
+ "os"
"path"
"regexp"
"sort"
@@ -13,7 +14,7 @@ import (
)
var (
- re = regexp.MustCompile("\\.ec[0-9][0-9]")
+ re = regexp.MustCompile(`\.ec[0-9][0-9]`)
)
func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
@@ -56,15 +57,18 @@ func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.S
func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shardId erasure_coding.ShardId) (err error) {
- ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.Directory, collection, vid, shardId)
+ ecVolumeShard, err := erasure_coding.NewEcVolumeShard(l.DiskType, l.Directory, collection, vid, shardId)
if err != nil {
+ if err == os.ErrNotExist {
+ return os.ErrNotExist
+ }
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
}
l.ecVolumesLock.Lock()
defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
- ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid)
+ ecVolume, err = erasure_coding.NewEcVolume(l.DiskType, l.Directory, l.IdxDirectory, collection, vid)
if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}
@@ -118,6 +122,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
if err != nil {
return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err)
}
+ if l.IdxDirectory != l.Directory {
+ indexFileInfos, err := ioutil.ReadDir(l.IdxDirectory)
+ if err != nil {
+ return fmt.Errorf("load all ec shards in dir %s: %v", l.IdxDirectory, err)
+ }
+ fileInfos = append(fileInfos, indexFileInfos...)
+ }
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].Name() < fileInfos[j].Name()
@@ -183,3 +194,10 @@ func (l *DiskLocation) unmountEcVolumeByCollection(collectionName string) map[ne
}
return deltaVols
}
+
+func (l *DiskLocation) EcVolumesLen() int {
+ l.ecVolumesLock.RLock()
+ defer l.ecVolumesLock.RUnlock()
+
+ return len(l.ecVolumes)
+}
diff --git a/weed/storage/erasure_coding/389.ecx b/weed/storage/erasure_coding/389.ecx
new file mode 100644
index 000000000..158781920
--- /dev/null
+++ b/weed/storage/erasure_coding/389.ecx
Binary files differ
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index ae77cee3f..47d3c6550 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -11,6 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
// write .idx file from .ecx and .ecj files
@@ -44,20 +45,20 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
// FindDatFileSize calculate .dat file size from max offset entry
// there may be extra deletions after that entry
// but they are deletions anyway
-func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
- version, err := readEcVolumeVersion(baseFileName)
+ version, err := readEcVolumeVersion(dataBaseFileName)
if err != nil {
- return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
}
- err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return nil
}
- entryStopOffset := offset.ToAcutalOffset() + needle.GetActualSize(size, version)
+ entryStopOffset := offset.ToActualOffset() + needle.GetActualSize(size, version)
if datSize < entryStopOffset {
datSize = entryStopOffset
}
@@ -87,7 +88,7 @@ func readEcVolumeVersion(baseFileName string) (version needle.Version, err error
}
-func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
ecxFile, openErr := os.OpenFile(baseFileName+".ecx", os.O_RDONLY, 0644)
if openErr != nil {
return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
@@ -118,9 +119,12 @@ func iterateEcxFile(baseFileName string, processNeedleFn func(key types.NeedleId
}
func iterateEcjFile(baseFileName string, processNeedleFn func(key types.NeedleId) error) error {
+ if !util.FileExists(baseFileName + ".ecj") {
+ return nil
+ }
ecjFile, openErr := os.OpenFile(baseFileName+".ecj", os.O_RDONLY, 0644)
if openErr != nil {
- return fmt.Errorf("cannot open ec index %s.ecx: %v", baseFileName, openErr)
+ return fmt.Errorf("cannot open ec index %s.ecj: %v", baseFileName, openErr)
}
defer ecjFile.Close()
diff --git a/weed/storage/erasure_coding/ec_encoder.go b/weed/storage/erasure_coding/ec_encoder.go
index eeb384b91..34b639407 100644
--- a/weed/storage/erasure_coding/ec_encoder.go
+++ b/weed/storage/erasure_coding/ec_encoder.go
@@ -5,12 +5,13 @@ import (
"io"
"os"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -25,9 +26,12 @@ const (
// all keys are sorted in ascending order
func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ if nm != nil {
+ defer nm.Close()
+ }
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
ecxFile, err := os.OpenFile(baseFileName+ext, os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
@@ -36,7 +40,7 @@ func WriteSortedFileFromIdx(baseFileName string, ext string) (e error) {
}
defer ecxFile.Close()
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
bytes := value.ToBytes()
_, writeErr := ecxFile.Write(bytes)
return writeErr
@@ -73,6 +77,8 @@ func generateEcFiles(baseFileName string, bufferSize int, largeBlockSize int64,
if err != nil {
return fmt.Errorf("failed to stat dat file: %v", err)
}
+
+ glog.V(0).Infof("encodeDatFile %s.dat size:%d", baseFileName, fi.Size())
err = encodeDatFile(fi.Size(), err, baseFileName, bufferSize, largeBlockSize, file, smallBlockSize)
if err != nil {
return fmt.Errorf("encodeDatFile: %v", err)
@@ -195,7 +201,7 @@ func encodeDatFile(remainingSize int64, err error, baseFileName string, bufferSi
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
buffers[i] = make([]byte, bufferSize)
}
@@ -232,7 +238,7 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
buffers := make([][]byte, TotalShardsCount)
- for i, _ := range buffers {
+ for i := range buffers {
if shardHasData[i] {
buffers[i] = make([]byte, ErasureCodingSmallBlockSize)
}
@@ -280,15 +286,15 @@ func rebuildEcFiles(shardHasData []bool, inputFiles []*os.File, outputFiles []*o
}
-func readCompactMap(baseFileName string) (*needle_map.CompactMap, error) {
+func readNeedleMap(baseFileName string) (*needle_map.MemDb, error) {
indexFile, err := os.OpenFile(baseFileName+".idx", os.O_RDONLY, 0644)
if err != nil {
return nil, fmt.Errorf("cannot read Volume Index %s.idx: %v", baseFileName, err)
}
defer indexFile.Close()
- cm := needle_map.NewCompactMap()
- err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error {
+ cm := needle_map.NewMemDb()
+ err = idx.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if !offset.IsZero() && size != types.TombstoneFileSize {
cm.Set(key, offset, size)
} else {
diff --git a/weed/storage/erasure_coding/ec_locate.go b/weed/storage/erasure_coding/ec_locate.go
index 562966f8f..19eba6235 100644
--- a/weed/storage/erasure_coding/ec_locate.go
+++ b/weed/storage/erasure_coding/ec_locate.go
@@ -1,14 +1,18 @@
package erasure_coding
+import (
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
type Interval struct {
BlockIndex int
InnerBlockOffset int64
- Size uint32
+ Size types.Size
IsLargeBlock bool
LargeBlockRowsCount int
}
-func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size uint32) (intervals []Interval) {
+func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset int64, size types.Size) (intervals []Interval) {
blockIndex, isLargeBlock, innerBlockOffset := locateOffset(largeBlockLength, smallBlockLength, datSize, offset)
// adding DataShardsCount*smallBlockLength to ensure we can derive the number of large block size from a shard size
@@ -32,7 +36,7 @@ func LocateData(largeBlockLength, smallBlockLength int64, datSize int64, offset
intervals = append(intervals, interval)
return
}
- interval.Size = uint32(blockRemaining)
+ interval.Size = types.Size(blockRemaining)
intervals = append(intervals, interval)
size -= interval.Size
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index 47e6d3d1e..2a57d85ef 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -2,9 +2,11 @@ package erasure_coding
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"os"
"path"
"strconv"
+ "strings"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -19,21 +21,25 @@ type EcVolumeShard struct {
dir string
ecdFile *os.File
ecdFileSize int64
+ DiskType types.DiskType
}
-func NewEcVolumeShard(dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
+func NewEcVolumeShard(diskType types.DiskType, dirname string, collection string, id needle.VolumeId, shardId ShardId) (v *EcVolumeShard, e error) {
- v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId}
+ v = &EcVolumeShard{dir: dirname, Collection: collection, VolumeId: id, ShardId: shardId, DiskType: diskType}
baseFileName := v.FileName()
// open ecd file
if v.ecdFile, e = os.OpenFile(baseFileName+ToExt(int(shardId)), os.O_RDONLY, 0644); e != nil {
- return nil, fmt.Errorf("cannot read ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), e)
+ if e == os.ErrNotExist || strings.Contains(e.Error(), "no such file or directory") {
+ return nil, os.ErrNotExist
+ }
+ return nil, fmt.Errorf("cannot read ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), e)
}
ecdFi, statErr := v.ecdFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume shard %s.%s: %v", baseFileName, ToExt(int(shardId)), statErr)
+ return nil, fmt.Errorf("can not stat ec volume shard %s%s: %v", baseFileName, ToExt(int(shardId)), statErr)
}
v.ecdFileSize = ecdFi.Size()
diff --git a/weed/storage/erasure_coding/ec_test.go b/weed/storage/erasure_coding/ec_test.go
index 0e4aaa27c..0d48bec02 100644
--- a/weed/storage/erasure_coding/ec_test.go
+++ b/weed/storage/erasure_coding/ec_test.go
@@ -7,9 +7,10 @@ import (
"os"
"testing"
+ "github.com/klauspost/reedsolomon"
+
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/klauspost/reedsolomon"
)
const (
@@ -41,9 +42,10 @@ func TestEncodingDecoding(t *testing.T) {
}
func validateFiles(baseFileName string) error {
- cm, err := readCompactMap(baseFileName)
+ nm, err := readNeedleMap(baseFileName)
+ defer nm.Close()
if err != nil {
- return fmt.Errorf("readCompactMap: %v", err)
+ return fmt.Errorf("readNeedleMap: %v", err)
}
datFile, err := os.OpenFile(baseFileName+".dat", os.O_RDONLY, 0)
@@ -60,7 +62,7 @@ func validateFiles(baseFileName string) error {
ecFiles, err := openEcFiles(baseFileName, true)
defer closeEcFiles(ecFiles)
- err = cm.AscendingVisit(func(value needle_map.NeedleValue) error {
+ err = nm.AscendingVisit(func(value needle_map.NeedleValue) error {
return assertSame(datFile, fi.Size(), ecFiles, value.Offset, value.Size)
})
if err != nil {
@@ -69,7 +71,7 @@ func validateFiles(baseFileName string) error {
return nil
}
-func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) error {
+func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) error {
data, err := readDatFile(datFile, offset, size)
if err != nil {
@@ -88,10 +90,10 @@ func assertSame(datFile *os.File, datSize int64, ecFiles []*os.File, offset type
return nil
}
-func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, error) {
+func readDatFile(datFile *os.File, offset types.Offset, size types.Size) ([]byte, error) {
data := make([]byte, size)
- n, err := datFile.ReadAt(data, offset.ToAcutalOffset())
+ n, err := datFile.ReadAt(data, offset.ToActualOffset())
if err != nil {
return nil, fmt.Errorf("failed to ReadAt dat file: %v", err)
}
@@ -101,9 +103,9 @@ func readDatFile(datFile *os.File, offset types.Offset, size uint32) ([]byte, er
return data, nil
}
-func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size uint32) (data []byte, err error) {
+func readEcFile(datSize int64, ecFiles []*os.File, offset types.Offset, size types.Size) (data []byte, err error) {
- intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToAcutalOffset(), size)
+ intervals := LocateData(largeBlockSize, smallBlockSize, datSize, offset.ToActualOffset(), size)
for i, interval := range intervals {
if d, e := readOneInterval(interval, ecFiles); e != nil {
@@ -138,7 +140,7 @@ func readOneInterval(interval Interval, ecFiles []*os.File) (data []byte, err er
return
}
-func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size uint32) (data []byte, err error) {
+func readFromOtherEcFiles(ecFiles []*os.File, ecFileIndex int, ecFileOffset int64, size types.Size) (data []byte, err error) {
enc, err := reedsolomon.New(DataShardsCount, ParityShardsCount)
if err != nil {
return nil, fmt.Errorf("failed to create encoder: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 579f037fb..171db92a4 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -25,6 +25,7 @@ type EcVolume struct {
VolumeId needle.VolumeId
Collection string
dir string
+ dirIdx string
ecxFile *os.File
ecxFileSize int64
ecxCreatedAt time.Time
@@ -35,35 +36,37 @@ type EcVolume struct {
Version needle.Version
ecjFile *os.File
ecjFileAccessLock sync.Mutex
+ diskType types.DiskType
}
-func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
+func NewEcVolume(diskType types.DiskType, dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid, diskType: diskType}
- baseFileName := EcShardFileName(collection, dir, int(vid))
+ dataBaseFileName := EcShardFileName(collection, dir, int(vid))
+ indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
// open ecj file
- if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
}
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ if volumeInfo, _, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
- pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
ev.ShardLocations = make(map[ShardId][]string)
@@ -134,24 +137,42 @@ func (ev *EcVolume) Destroy() {
for _, s := range ev.Shards {
s.Destroy()
}
- os.Remove(ev.FileName() + ".ecx")
- os.Remove(ev.FileName() + ".ecj")
- os.Remove(ev.FileName() + ".vif")
+ os.Remove(ev.FileName(".ecx"))
+ os.Remove(ev.FileName(".ecj"))
+ os.Remove(ev.FileName(".vif"))
}
-func (ev *EcVolume) FileName() string {
+func (ev *EcVolume) FileName(ext string) string {
+ switch ext {
+ case ".ecx", ".ecj":
+ return ev.IndexBaseFileName() + ext
+ }
+ // .vif
+ return ev.DataBaseFileName() + ext
+}
+func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
+}
+func (ev *EcVolume) IndexBaseFileName() string {
+ return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
}
-func (ev *EcVolume) ShardSize() int64 {
+func (ev *EcVolume) ShardSize() uint64 {
if len(ev.Shards) > 0 {
- return ev.Shards[0].Size()
+ return uint64(ev.Shards[0].Size())
}
return 0
}
+func (ev *EcVolume) Size() (size int64) {
+ for _, shard := range ev.Shards {
+ size += shard.Size()
+ }
+ return
+}
+
func (ev *EcVolume) CreatedAt() time.Time {
return ev.ecxCreatedAt
}
@@ -171,6 +192,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
Collection: s.Collection,
+ DiskType: string(ev.diskType),
}
messages = append(messages, m)
}
@@ -180,7 +202,7 @@ func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.V
return
}
-func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size uint32, intervals []Interval, err error) {
+func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.Version) (offset types.Offset, size types.Size, intervals []Interval, err error) {
// find the needle from ecx file
offset, size, err = ev.FindNeedleFromEcx(needleId)
@@ -191,16 +213,16 @@ func (ev *EcVolume) LocateEcShardNeedle(needleId types.NeedleId, version needle.
shard := ev.Shards[0]
// calculate the locations in the ec shards
- intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToAcutalOffset(), uint32(needle.GetActualSize(size, version)))
+ intervals = LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shard.ecdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, version)))
return
}
-func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size uint32, err error) {
+func (ev *EcVolume) FindNeedleFromEcx(needleId types.NeedleId) (offset types.Offset, size types.Size, err error) {
return SearchNeedleFromSortedIndex(ev.ecxFile, ev.ecxFileSize, needleId, nil)
}
-func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size uint32, err error) {
+func SearchNeedleFromSortedIndex(ecxFile *os.File, ecxFileSize int64, needleId types.NeedleId, processNeedleFn func(file *os.File, offset int64) error) (offset types.Offset, size types.Size, err error) {
var key types.NeedleId
buf := make([]byte, types.NeedleMapEntrySize)
l, h := int64(0), ecxFileSize/types.NeedleMapEntrySize
diff --git a/weed/storage/erasure_coding/ec_volume_delete.go b/weed/storage/erasure_coding/ec_volume_delete.go
index 822a9e923..a7f8c24a3 100644
--- a/weed/storage/erasure_coding/ec_volume_delete.go
+++ b/weed/storage/erasure_coding/ec_volume_delete.go
@@ -12,7 +12,7 @@ import (
var (
MarkNeedleDeleted = func(file *os.File, offset int64) error {
b := make([]byte, types.SizeSize)
- util.Uint32toBytes(b, types.TombstoneFileSize)
+ types.SizeToBytes(b, types.TombstoneFileSize)
n, err := file.WriteAt(b, offset+types.NeedleIdSize+types.OffsetSize)
if err != nil {
return fmt.Errorf("sorted needle write error: %v", err)
diff --git a/weed/storage/erasure_coding/ec_volume_info.go b/weed/storage/erasure_coding/ec_volume_info.go
index 8ff65bb0f..3dd535e64 100644
--- a/weed/storage/erasure_coding/ec_volume_info.go
+++ b/weed/storage/erasure_coding/ec_volume_info.go
@@ -10,13 +10,15 @@ type EcVolumeInfo struct {
VolumeId needle.VolumeId
Collection string
ShardBits ShardBits
+ DiskType string
}
-func NewEcVolumeInfo(collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
+func NewEcVolumeInfo(diskType string, collection string, vid needle.VolumeId, shardBits ShardBits) *EcVolumeInfo {
return &EcVolumeInfo{
Collection: collection,
VolumeId: vid,
ShardBits: shardBits,
+ DiskType: diskType,
}
}
@@ -45,6 +47,7 @@ func (ecInfo *EcVolumeInfo) Minus(other *EcVolumeInfo) *EcVolumeInfo {
VolumeId: ecInfo.VolumeId,
Collection: ecInfo.Collection,
ShardBits: ecInfo.ShardBits.Minus(other.ShardBits),
+ DiskType: ecInfo.DiskType,
}
return ret
@@ -55,6 +58,7 @@ func (ecInfo *EcVolumeInfo) ToVolumeEcShardInformationMessage() (ret *master_pb.
Id: uint32(ecInfo.VolumeId),
EcIndexBits: uint32(ecInfo.ShardBits),
Collection: ecInfo.Collection,
+ DiskType: ecInfo.DiskType,
}
}
diff --git a/weed/storage/erasure_coding/ec_volume_test.go b/weed/storage/erasure_coding/ec_volume_test.go
new file mode 100644
index 000000000..747ef4aab
--- /dev/null
+++ b/weed/storage/erasure_coding/ec_volume_test.go
@@ -0,0 +1,54 @@
+package erasure_coding
+
+import (
+ "fmt"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func TestPositioning(t *testing.T) {
+
+ ecxFile, err := os.OpenFile("389.ecx", os.O_RDONLY, 0)
+ if err != nil {
+ t.Errorf("failed to open ecx file: %v", err)
+ }
+ defer ecxFile.Close()
+
+ stat, _ := ecxFile.Stat()
+ fileSize := stat.Size()
+
+ tests := []struct {
+ needleId string
+ offset int64
+ size int
+ }{
+ {needleId: "0f0edb92", offset: 31300679656, size: 1167},
+ {needleId: "0ef7d7f8", offset: 11513014944, size: 66044},
+ }
+
+ for _, test := range tests {
+ needleId, _ := types.ParseNeedleId(test.needleId)
+ offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
+ assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
+ }
+
+ needleId, _ := types.ParseNeedleId("0f087622")
+ offset, size, err := SearchNeedleFromSortedIndex(ecxFile, fileSize, needleId, nil)
+ assert.Equal(t, nil, err, "SearchNeedleFromSortedIndex")
+ fmt.Printf("offset: %d size: %d\n", offset.ToActualOffset(), size)
+
+ var shardEcdFileSize int64 = 1118830592 // 1024*1024*1024*3
+ intervals := LocateData(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize, DataShardsCount*shardEcdFileSize, offset.ToActualOffset(), types.Size(needle.GetActualSize(size, needle.CurrentVersion)))
+
+ for _, interval := range intervals {
+ shardId, shardOffset := interval.ToShardIdAndOffset(ErasureCodingLargeBlockSize, ErasureCodingSmallBlockSize)
+ fmt.Printf("interval: %+v, shardId: %d, shardOffset: %d\n", interval, shardId, shardOffset)
+ }
+
+}
diff --git a/weed/storage/idx/walk.go b/weed/storage/idx/walk.go
index 90efb75e6..5215d3c4f 100644
--- a/weed/storage/idx/walk.go
+++ b/weed/storage/idx/walk.go
@@ -2,25 +2,26 @@ package idx
import (
"io"
- "os"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
// walks through the index file, calls fn function with each key, offset, size
// stops with the error returned by the fn function
-func WalkIndexFile(r *os.File, fn func(key types.NeedleId, offset types.Offset, size uint32) error) error {
+func WalkIndexFile(r io.ReaderAt, fn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
var readerOffset int64
bytes := make([]byte, types.NeedleMapEntrySize*RowsToRead)
count, e := r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ if count == 0 && e == io.EOF {
+ return nil
+ }
+ glog.V(3).Infof("readerOffset %d count %d err: %v", readerOffset, count, e)
readerOffset += int64(count)
var (
key types.NeedleId
offset types.Offset
- size uint32
+ size types.Size
i int
)
@@ -35,16 +36,16 @@ func WalkIndexFile(r *os.File, fn func(key types.NeedleId, offset types.Offset,
return nil
}
count, e = r.ReadAt(bytes, readerOffset)
- glog.V(3).Infoln("file", r.Name(), "readerOffset", readerOffset, "count", count, "e", e)
+ glog.V(3).Infof("readerOffset %d count %d err: %v", readerOffset, count, e)
readerOffset += int64(count)
}
return e
}
-func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size uint32) {
+func IdxFileEntry(bytes []byte) (key types.NeedleId, offset types.Offset, size types.Size) {
key = types.BytesToNeedleId(bytes[:types.NeedleIdSize])
offset = types.BytesToOffset(bytes[types.NeedleIdSize : types.NeedleIdSize+types.OffsetSize])
- size = util.BytesToUint32(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize])
+ size = types.BytesToSize(bytes[types.NeedleIdSize+types.OffsetSize : types.NeedleIdSize+types.OffsetSize+types.SizeSize])
return
}
diff --git a/weed/storage/needle/async_request.go b/weed/storage/needle/async_request.go
new file mode 100644
index 000000000..ea02c55c5
--- /dev/null
+++ b/weed/storage/needle/async_request.go
@@ -0,0 +1,53 @@
+package needle
+
+type AsyncRequest struct {
+ N *Needle
+ IsWriteRequest bool
+ ActualSize int64
+ offset uint64
+ size uint64
+ doneChan chan interface{}
+ isUnchanged bool
+ err error
+}
+
+func NewAsyncRequest(n *Needle, isWriteRequest bool) *AsyncRequest {
+ return &AsyncRequest{
+ offset: 0,
+ size: 0,
+ ActualSize: 0,
+ doneChan: make(chan interface{}),
+ N: n,
+ isUnchanged: false,
+ IsWriteRequest: isWriteRequest,
+ err: nil,
+ }
+}
+
+func (r *AsyncRequest) WaitComplete() (uint64, uint64, bool, error) {
+ <-r.doneChan
+ return r.offset, r.size, r.isUnchanged, r.err
+}
+
+func (r *AsyncRequest) Complete(offset uint64, size uint64, isUnchanged bool, err error) {
+ r.offset = offset
+ r.size = size
+ r.isUnchanged = isUnchanged
+ r.err = err
+ close(r.doneChan)
+}
+
+func (r *AsyncRequest) UpdateResult(offset uint64, size uint64, isUnchanged bool, err error) {
+ r.offset = offset
+ r.size = size
+ r.isUnchanged = isUnchanged
+ r.err = err
+}
+
+func (r *AsyncRequest) Submit() {
+ close(r.doneChan)
+}
+
+func (r *AsyncRequest) IsSucceed() bool {
+ return r.err == nil
+}
diff --git a/weed/storage/needle/crc.go b/weed/storage/needle/crc.go
index 00ea1db69..4476631c2 100644
--- a/weed/storage/needle/crc.go
+++ b/weed/storage/needle/crc.go
@@ -1,11 +1,12 @@
package needle
import (
- "crypto/md5"
"fmt"
+ "io"
- "github.com/chrislusf/seaweedfs/weed/util"
"github.com/klauspost/crc32"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
var table = crc32.MakeTable(crc32.Castagnoli)
@@ -30,12 +31,24 @@ func (n *Needle) Etag() string {
return fmt.Sprintf("%x", bits)
}
-func (n *Needle) MD5() string {
+func NewCRCwriter(w io.Writer) *CRCwriter {
- hash := md5.New()
+ return &CRCwriter{
+ crc: CRC(0),
+ w: w,
+ }
- hash.Write(n.Data)
+}
- return fmt.Sprintf("%x", hash.Sum(nil))
+type CRCwriter struct {
+ crc CRC
+ w io.Writer
+}
+func (c *CRCwriter) Write(p []byte) (n int, err error) {
+ n, err = c.w.Write(p) // with each write ...
+ c.crc = c.crc.Update(p)
+ return
}
+
+func (c *CRCwriter) Sum() uint32 { return c.crc.Value() } // final hash
diff --git a/weed/storage/needle/file_id.go b/weed/storage/needle/file_id.go
index 5dabb0f25..6055bdd1c 100644
--- a/weed/storage/needle/file_id.go
+++ b/weed/storage/needle/file_id.go
@@ -66,7 +66,7 @@ func formatNeedleIdCookie(key NeedleId, cookie Cookie) string {
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
CookieToBytes(bytes[NeedleIdSize:NeedleIdSize+CookieSize], cookie)
nonzero_index := 0
- for ; bytes[nonzero_index] == 0; nonzero_index++ {
+ for ; bytes[nonzero_index] == 0 && nonzero_index < NeedleIdSize; nonzero_index++ {
}
return hex.EncodeToString(bytes[nonzero_index:])
}
diff --git a/weed/storage/needle/needle.go b/weed/storage/needle/needle.go
index 2f03ba87b..34d29ab6e 100644
--- a/weed/storage/needle/needle.go
+++ b/weed/storage/needle/needle.go
@@ -8,8 +8,6 @@ import (
"strings"
"time"
- "io/ioutil"
-
"github.com/chrislusf/seaweedfs/weed/images"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
@@ -26,7 +24,7 @@ const (
type Needle struct {
Cookie Cookie `comment:"random number to mitigate brute force lookups"`
Id NeedleId `comment:"needle id"`
- Size uint32 `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
+ Size Size `comment:"sum of DataSize,Data,NameSize,Name,MimeSize,Mime"`
DataSize uint32 `comment:"Data size"` //version2
Data []byte `comment:"The actual file data"`
@@ -46,57 +44,33 @@ type Needle struct {
}
func (n *Needle) String() (str string) {
- str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime)
+ str = fmt.Sprintf("%s Size:%d, DataSize:%d, Name:%s, Mime:%s Compressed:%v", formatNeedleIdCookie(n.Id, n.Cookie), n.Size, n.DataSize, n.Name, n.Mime, n.IsCompressed())
return
}
-func ParseUpload(r *http.Request) (
- fileName string, data []byte, mimeType string, pairMap map[string]string, isGzipped bool, originalDataSize int,
- modifiedTime uint64, ttl *TTL, isChunkedFile bool, e error) {
- pairMap = make(map[string]string)
- for k, v := range r.Header {
- if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
- pairMap[k] = v[0]
- }
- }
-
- if r.Method == "POST" {
- fileName, data, mimeType, isGzipped, originalDataSize, isChunkedFile, e = parseMultipart(r)
- } else {
- isGzipped = false
- mimeType = r.Header.Get("Content-Type")
- fileName = ""
- data, e = ioutil.ReadAll(r.Body)
- originalDataSize = len(data)
- }
- if e != nil {
- return
- }
-
- modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
- ttl, _ = ReadTTL(r.FormValue("ttl"))
-
- return
-}
-func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle, originalSize int, e error) {
- var pairMap map[string]string
- fname, mimeType, isGzipped, isChunkedFile := "", "", false, false
+func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool, sizeLimit int64) (n *Needle, originalSize int, contentMd5 string, e error) {
n = new(Needle)
- fname, n.Data, mimeType, pairMap, isGzipped, originalSize, n.LastModified, n.Ttl, isChunkedFile, e = ParseUpload(r)
+ pu, e := ParseUpload(r, sizeLimit)
if e != nil {
return
}
- if len(fname) < 256 {
- n.Name = []byte(fname)
+ n.Data = pu.Data
+ originalSize = pu.OriginalDataSize
+ n.LastModified = pu.ModifiedTime
+ n.Ttl = pu.Ttl
+ contentMd5 = pu.ContentMd5
+
+ if len(pu.FileName) < 256 {
+ n.Name = []byte(pu.FileName)
n.SetHasName()
}
- if len(mimeType) < 256 {
- n.Mime = []byte(mimeType)
+ if len(pu.MimeType) < 256 {
+ n.Mime = []byte(pu.MimeType)
n.SetHasMime()
}
- if len(pairMap) != 0 {
+ if len(pu.PairMap) != 0 {
trimmedPairMap := make(map[string]string)
- for k, v := range pairMap {
+ for k, v := range pu.PairMap {
trimmedPairMap[k[len(PairNamePrefix):]] = v
}
@@ -107,8 +81,9 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle
n.SetHasPairs()
}
}
- if isGzipped {
- n.SetGzipped()
+ if pu.IsGzipped {
+ // println(r.URL.Path, "is set to compressed", pu.FileName, pu.IsGzipped, "dataSize", pu.OriginalDataSize)
+ n.SetIsCompressed()
}
if n.LastModified == 0 {
n.LastModified = uint64(time.Now().Unix())
@@ -118,13 +93,13 @@ func CreateNeedleFromRequest(r *http.Request, fixJpgOrientation bool) (n *Needle
n.SetHasTtl()
}
- if isChunkedFile {
+ if pu.IsChunkedFile {
n.SetIsChunkManifest()
}
if fixJpgOrientation {
- loweredName := strings.ToLower(fname)
- if mimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
+ loweredName := strings.ToLower(pu.FileName)
+ if pu.MimeType == "image/jpeg" || strings.HasSuffix(loweredName, ".jpg") || strings.HasSuffix(loweredName, ".jpeg") {
n.Data = images.FixJpgOrientation(n.Data)
}
}
diff --git a/weed/storage/needle/needle_parse_multipart.go b/weed/storage/needle/needle_parse_multipart.go
deleted file mode 100644
index 8be1a1da4..000000000
--- a/weed/storage/needle/needle_parse_multipart.go
+++ /dev/null
@@ -1,109 +0,0 @@
-package needle
-
-import (
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/util"
-
- "io"
- "io/ioutil"
- "mime"
- "net/http"
- "path"
- "strconv"
- "strings"
-)
-
-func parseMultipart(r *http.Request) (
- fileName string, data []byte, mimeType string, isGzipped bool, originalDataSize int, isChunkedFile bool, e error) {
- defer func() {
- if e != nil && r.Body != nil {
- io.Copy(ioutil.Discard, r.Body)
- r.Body.Close()
- }
- }()
- form, fe := r.MultipartReader()
- if fe != nil {
- glog.V(0).Infoln("MultipartReader [ERROR]", fe)
- e = fe
- return
- }
-
- //first multi-part item
- part, fe := form.NextPart()
- if fe != nil {
- glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
- e = fe
- return
- }
-
- fileName = part.FileName()
- if fileName != "" {
- fileName = path.Base(fileName)
- }
-
- data, e = ioutil.ReadAll(part)
- if e != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", e)
- return
- }
-
- //if the filename is empty string, do a search on the other multi-part items
- for fileName == "" {
- part2, fe := form.NextPart()
- if fe != nil {
- break // no more or on error, just safely break
- }
-
- fName := part2.FileName()
-
- //found the first <file type> multi-part has filename
- if fName != "" {
- data2, fe2 := ioutil.ReadAll(part2)
- if fe2 != nil {
- glog.V(0).Infoln("Reading Content [ERROR]", fe2)
- e = fe2
- return
- }
-
- //update
- data = data2
- fileName = path.Base(fName)
- break
- }
- }
-
- originalDataSize = len(data)
-
- isChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
-
- if !isChunkedFile {
-
- dotIndex := strings.LastIndex(fileName, ".")
- ext, mtype := "", ""
- if dotIndex > 0 {
- ext = strings.ToLower(fileName[dotIndex:])
- mtype = mime.TypeByExtension(ext)
- }
- contentType := part.Header.Get("Content-Type")
- if contentType != "" && mtype != contentType {
- mimeType = contentType //only return mime type if not deductable
- mtype = contentType
- }
-
- if part.Header.Get("Content-Encoding") == "gzip" {
- if unzipped, e := util.UnGzipData(data); e == nil {
- originalDataSize = len(unzipped)
- }
- isGzipped = true
- } else if util.IsGzippable(ext, mtype, data) {
- if compressedData, err := util.GzipData(data); err == nil {
- if len(data) > len(compressedData) {
- data = compressedData
- isGzipped = true
- }
- }
- }
- }
-
- return
-}
diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go
new file mode 100644
index 000000000..7201503f1
--- /dev/null
+++ b/weed/storage/needle/needle_parse_upload.go
@@ -0,0 +1,201 @@
+package needle
+
+import (
+ "crypto/md5"
+ "encoding/base64"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime"
+ "net/http"
+ "path"
+ "path/filepath"
+ "strconv"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+type ParsedUpload struct {
+ FileName string
+ Data []byte
+ MimeType string
+ PairMap map[string]string
+ IsGzipped bool
+ // IsZstd bool
+ OriginalDataSize int
+ ModifiedTime uint64
+ Ttl *TTL
+ IsChunkedFile bool
+ UncompressedData []byte
+ ContentMd5 string
+}
+
+func ParseUpload(r *http.Request, sizeLimit int64) (pu *ParsedUpload, e error) {
+ pu = &ParsedUpload{}
+ pu.PairMap = make(map[string]string)
+ for k, v := range r.Header {
+ if len(v) > 0 && strings.HasPrefix(k, PairNamePrefix) {
+ pu.PairMap[k] = v[0]
+ }
+ }
+
+ if r.Method == "POST" {
+ e = parseMultipart(r, sizeLimit, pu)
+ } else {
+ e = parsePut(r, sizeLimit, pu)
+ }
+ if e != nil {
+ return
+ }
+
+ pu.ModifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64)
+ pu.Ttl, _ = ReadTTL(r.FormValue("ttl"))
+
+ pu.OriginalDataSize = len(pu.Data)
+ pu.UncompressedData = pu.Data
+ // println("received data", len(pu.Data), "isGzipped", pu.IsGzipped, "mime", pu.MimeType, "name", pu.FileName)
+ if pu.IsGzipped {
+ if unzipped, e := util.DecompressData(pu.Data); e == nil {
+ pu.OriginalDataSize = len(unzipped)
+ pu.UncompressedData = unzipped
+ // println("ungzipped data size", len(unzipped))
+ }
+ } else {
+ ext := filepath.Base(pu.FileName)
+ mimeType := pu.MimeType
+ if mimeType == "" {
+ mimeType = http.DetectContentType(pu.Data)
+ }
+ // println("detected mimetype to", pu.MimeType)
+ if mimeType == "application/octet-stream" {
+ mimeType = ""
+ }
+ if shouldBeCompressed, iAmSure := util.IsCompressableFileType(ext, mimeType); mimeType == "" && !iAmSure || shouldBeCompressed && iAmSure {
+ // println("ext", ext, "iAmSure", iAmSure, "shouldBeCompressed", shouldBeCompressed, "mimeType", pu.MimeType)
+ if compressedData, err := util.GzipData(pu.Data); err == nil {
+ if len(compressedData)*10 < len(pu.Data)*9 {
+ pu.Data = compressedData
+ pu.IsGzipped = true
+ }
+ // println("gzipped data size", len(compressedData))
+ }
+ }
+ }
+
+ // md5
+ h := md5.New()
+ h.Write(pu.UncompressedData)
+ pu.ContentMd5 = base64.StdEncoding.EncodeToString(h.Sum(nil))
+ if expectedChecksum := r.Header.Get("Content-MD5"); expectedChecksum != "" {
+ if expectedChecksum != pu.ContentMd5 {
+ e = fmt.Errorf("Content-MD5 did not match md5 of file data expected [%s] received [%s] size %d", expectedChecksum, pu.ContentMd5, len(pu.UncompressedData))
+ return
+ }
+ }
+
+ return
+}
+
+func parsePut(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
+ pu.IsGzipped = r.Header.Get("Content-Encoding") == "gzip"
+ // pu.IsZstd = r.Header.Get("Content-Encoding") == "zstd"
+ pu.MimeType = r.Header.Get("Content-Type")
+ pu.FileName = ""
+ pu.Data, e = ioutil.ReadAll(io.LimitReader(r.Body, sizeLimit+1))
+ if e == io.EOF || int64(pu.OriginalDataSize) == sizeLimit+1 {
+ io.Copy(ioutil.Discard, r.Body)
+ }
+ r.Body.Close()
+ return nil
+}
+
+func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error) {
+ defer func() {
+ if e != nil && r.Body != nil {
+ io.Copy(ioutil.Discard, r.Body)
+ r.Body.Close()
+ }
+ }()
+ form, fe := r.MultipartReader()
+ if fe != nil {
+ glog.V(0).Infoln("MultipartReader [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ // first multi-part item
+ part, fe := form.NextPart()
+ if fe != nil {
+ glog.V(0).Infoln("Reading Multi part [ERROR]", fe)
+ e = fe
+ return
+ }
+
+ pu.FileName = part.FileName()
+ if pu.FileName != "" {
+ pu.FileName = path.Base(pu.FileName)
+ }
+
+ pu.Data, e = ioutil.ReadAll(io.LimitReader(part, sizeLimit+1))
+ if e != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", e)
+ return
+ }
+ if len(pu.Data) == int(sizeLimit)+1 {
+ e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
+ return
+ }
+
+ // if the filename is empty string, do a search on the other multi-part items
+ for pu.FileName == "" {
+ part2, fe := form.NextPart()
+ if fe != nil {
+ break // no more or on error, just safely break
+ }
+
+ fName := part2.FileName()
+
+ // found the first <file type> multi-part has filename
+ if fName != "" {
+ data2, fe2 := ioutil.ReadAll(io.LimitReader(part2, sizeLimit+1))
+ if fe2 != nil {
+ glog.V(0).Infoln("Reading Content [ERROR]", fe2)
+ e = fe2
+ return
+ }
+ if len(data2) == int(sizeLimit)+1 {
+ e = fmt.Errorf("file over the limited %d bytes", sizeLimit)
+ return
+ }
+
+ // update
+ pu.Data = data2
+ pu.FileName = path.Base(fName)
+ break
+ }
+ }
+
+ pu.IsChunkedFile, _ = strconv.ParseBool(r.FormValue("cm"))
+
+ if !pu.IsChunkedFile {
+
+ dotIndex := strings.LastIndex(pu.FileName, ".")
+ ext, mtype := "", ""
+ if dotIndex > 0 {
+ ext = strings.ToLower(pu.FileName[dotIndex:])
+ mtype = mime.TypeByExtension(ext)
+ }
+ contentType := part.Header.Get("Content-Type")
+ if contentType != "" && contentType != "application/octet-stream" && mtype != contentType {
+ pu.MimeType = contentType // only return mime type if not deductable
+ mtype = contentType
+ }
+
+ }
+ pu.IsGzipped = part.Header.Get("Content-Encoding") == "gzip"
+ // pu.IsZstd = part.Header.Get("Content-Encoding") == "zstd"
+
+ return
+}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 7f8aa4823..16c2fd06b 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -3,17 +3,16 @@ package needle
import (
"errors"
"fmt"
- "io"
- "math"
-
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "math"
)
const (
- FlagGzip = 0x01
+ FlagIsCompressed = 0x01
FlagHasName = 0x02
FlagHasMime = 0x04
FlagHasLastModifiedDate = 0x08
@@ -24,11 +23,13 @@ const (
TtlBytesLength = 2
)
+var ErrorSizeMismatch = errors.New("size mismatch")
+
func (n *Needle) DiskSize(version Version) int64 {
return GetActualSize(n.Size, version)
}
-func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, error) {
+func (n *Needle) prepareWriteBuffer(version Version) ([]byte, Size, int64, error) {
writeBytes := make([]byte, 0)
@@ -37,8 +38,8 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
header := make([]byte, NeedleHeaderSize)
CookieToBytes(header[0:CookieSize], n.Cookie)
NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
- n.Size = uint32(len(n.Data))
- util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+ n.Size = Size(len(n.Data))
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
size := n.Size
actualSize := NeedleHeaderSize + int64(n.Size)
writeBytes = append(writeBytes, header...)
@@ -58,12 +59,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
}
n.DataSize, n.MimeSize = uint32(len(n.Data)), uint8(len(n.Mime))
if n.DataSize > 0 {
- n.Size = 4 + n.DataSize + 1
+ n.Size = 4 + Size(n.DataSize) + 1
if n.HasName() {
- n.Size = n.Size + 1 + uint32(n.NameSize)
+ n.Size = n.Size + 1 + Size(n.NameSize)
}
if n.HasMime() {
- n.Size = n.Size + 1 + uint32(n.MimeSize)
+ n.Size = n.Size + 1 + Size(n.MimeSize)
}
if n.HasLastModifiedDate() {
n.Size = n.Size + LastModifiedBytesLength
@@ -72,12 +73,12 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
n.Size = n.Size + TtlBytesLength
}
if n.HasPairs() {
- n.Size += 2 + uint32(n.PairsSize)
+ n.Size += 2 + Size(n.PairsSize)
}
} else {
n.Size = 0
}
- util.Uint32toBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
writeBytes = append(writeBytes, header[0:NeedleHeaderSize]...)
if n.DataSize > 0 {
util.Uint32toBytes(header[0:4], n.DataSize)
@@ -119,13 +120,13 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
writeBytes = append(writeBytes, header[0:NeedleChecksumSize+TimestampSize+padding]...)
}
- return writeBytes, n.DataSize, GetActualSize(n.Size, version), nil
+ return writeBytes, Size(n.DataSize), GetActualSize(n.Size, version), nil
}
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
defer func(w backend.BackendStorageFile, off int64) {
@@ -140,6 +141,10 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
return
}
+ if offset >= MaxPossibleVolumeSize {
+ err = fmt.Errorf("Volume Size %d Exeededs %d", offset, MaxPossibleVolumeSize)
+ return
+ }
bytesToWrite, size, actualSize, err := n.prepareWriteBuffer(version)
@@ -150,21 +155,63 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u
return offset, size, actualSize, err
}
-func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
+func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
+
+ if end, _, e := w.GetStat(); e == nil {
+ defer func(w backend.BackendStorageFile, off int64) {
+ if err != nil {
+ if te := w.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
+ }
+ }
+ }(w, end)
+ offset = uint64(end)
+ } else {
+ err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
+ return
+ }
+
+ if version == Version3 {
+ tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
+ util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs)
+ }
+
+ if err == nil {
+ _, err = w.WriteAt(dataSlice, int64(offset))
+ }
+
+ return
+
+}
+
+func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size Size, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
- _, err = r.ReadAt(dataSlice, offset)
+ var n int
+ n, err = r.ReadAt(dataSlice, offset)
+ if err != nil && int64(n) == dataSize {
+ err = nil
+ }
+ if err != nil {
+ fileSize, _, _ := r.GetStat()
+ println("n", n, "dataSize", dataSize, "offset", offset, "fileSize", fileSize)
+ }
return dataSlice, err
}
// ReadBytes hydrates the needle from the bytes buffer, with only n.Id is set.
-func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadBytes(bytes []byte, offset int64, size Size, version Version) (err error) {
n.ParseNeedleHeader(bytes)
if n.Size != size {
- return fmt.Errorf("entry not found: offset %d found id %d size %d, expected size %d", offset, n.Id, n.Size, size)
+ // cookie is not always passed in for this API. Use size to do preliminary checking.
+ if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
+ glog.Errorf("entry not found1: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
+ return ErrorSizeMismatch
+ }
+ return fmt.Errorf("entry not found: offset %d found id %x size %d, expected size %d", offset, n.Id, n.Size, size)
}
switch version {
case Version1:
@@ -191,7 +238,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers
}
// ReadData hydrates the needle from the file, with only n.Id is set.
-func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size, version)
if err != nil {
return err
@@ -202,7 +249,7 @@ func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint3
func (n *Needle) ParseNeedleHeader(bytes []byte) {
n.Cookie = BytesToCookie(bytes[0:CookieSize])
n.Id = BytesToNeedleId(bytes[CookieSize : CookieSize+NeedleIdSize])
- n.Size = util.BytesToUint32(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize])
+ n.Size = BytesToSize(bytes[CookieSize+NeedleIdSize : NeedleHeaderSize])
}
func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
@@ -284,7 +331,7 @@ func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int6
return
}
-func PaddingLength(needleSize uint32, version Version) uint32 {
+func PaddingLength(needleSize Size, version Version) Size {
if version == Version3 {
// this is same value as version2, but just listed here for clarity
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize + TimestampSize) % NeedlePaddingSize)
@@ -292,7 +339,7 @@ func PaddingLength(needleSize uint32, version Version) uint32 {
return NeedlePaddingSize - ((NeedleHeaderSize + needleSize + NeedleChecksumSize) % NeedlePaddingSize)
}
-func NeedleBodyLength(needleSize uint32, version Version) int64 {
+func NeedleBodyLength(needleSize Size, version Version) int64 {
if version == Version3 {
return int64(needleSize) + NeedleChecksumSize + TimestampSize + int64(PaddingLength(needleSize, version))
}
@@ -339,11 +386,11 @@ func (n *Needle) ReadNeedleBodyBytes(needleBody []byte, version Version) (err er
return
}
-func (n *Needle) IsGzipped() bool {
- return n.Flags&FlagGzip > 0
+func (n *Needle) IsCompressed() bool {
+ return n.Flags&FlagIsCompressed > 0
}
-func (n *Needle) SetGzipped() {
- n.Flags = n.Flags | FlagGzip
+func (n *Needle) SetIsCompressed() {
+ n.Flags = n.Flags | FlagIsCompressed
}
func (n *Needle) HasName() bool {
return n.Flags&FlagHasName > 0
@@ -386,6 +433,6 @@ func (n *Needle) SetHasPairs() {
n.Flags = n.Flags | FlagHasPairs
}
-func GetActualSize(size uint32, version Version) int64 {
+func GetActualSize(size Size, version Version) int64 {
return NeedleHeaderSize + NeedleBodyLength(size, version)
}
diff --git a/weed/storage/needle/needle_read_write_test.go b/weed/storage/needle/needle_read_write_test.go
index 47582dd26..afcea5a05 100644
--- a/weed/storage/needle/needle_read_write_test.go
+++ b/weed/storage/needle/needle_read_write_test.go
@@ -48,7 +48,7 @@ func TestAppend(t *testing.T) {
int64 : -9223372036854775808 to 9223372036854775807
*/
- fileSize := int64(4294967295) + 10000
+ fileSize := int64(4294967296) + 10000
tempFile.Truncate(fileSize)
defer func() {
tempFile.Close()
diff --git a/weed/storage/needle/volume_ttl.go b/weed/storage/needle/volume_ttl.go
index 4a169870d..d0de3768e 100644
--- a/weed/storage/needle/volume_ttl.go
+++ b/weed/storage/needle/volume_ttl.go
@@ -1,11 +1,12 @@
package needle
import (
+ "fmt"
"strconv"
)
const (
- //stored unit types
+ // stored unit types
Empty byte = iota
Minute
Hour
@@ -69,6 +70,9 @@ func (t *TTL) ToBytes(output []byte) {
}
func (t *TTL) ToUint32() (output uint32) {
+ if t == nil || t.Count == 0 {
+ return 0
+ }
output = uint32(t.Count) << 8
output += uint32(t.Unit)
return output
@@ -130,9 +134,49 @@ func (t TTL) Minutes() uint32 {
case Week:
return uint32(t.Count) * 60 * 24 * 7
case Month:
- return uint32(t.Count) * 60 * 24 * 31
+ return uint32(t.Count) * 60 * 24 * 30
case Year:
return uint32(t.Count) * 60 * 24 * 365
}
return 0
}
+
+func SecondsToTTL(seconds int32) string {
+ if seconds == 0 {
+ return ""
+ }
+ if seconds%(3600*24*365) == 0 && seconds/(3600*24*365) < 256 {
+ return fmt.Sprintf("%dy", seconds/(3600*24*365))
+ }
+ if seconds%(3600*24*30) == 0 && seconds/(3600*24*30) < 256 {
+ return fmt.Sprintf("%dM", seconds/(3600*24*30))
+ }
+ if seconds%(3600*24*7) == 0 && seconds/(3600*24*7) < 256 {
+ return fmt.Sprintf("%dw", seconds/(3600*24*7))
+ }
+ if seconds%(3600*24) == 0 && seconds/(3600*24) < 256 {
+ return fmt.Sprintf("%dd", seconds/(3600*24))
+ }
+ if seconds%(3600) == 0 && seconds/(3600) < 256 {
+ return fmt.Sprintf("%dh", seconds/(3600))
+ }
+ if seconds/60 < 256 {
+ return fmt.Sprintf("%dm", seconds/60)
+ }
+ if seconds/(3600) < 256 {
+ return fmt.Sprintf("%dh", seconds/(3600))
+ }
+ if seconds/(3600*24) < 256 {
+ return fmt.Sprintf("%dd", seconds/(3600*24))
+ }
+ if seconds/(3600*24*7) < 256 {
+ return fmt.Sprintf("%dw", seconds/(3600*24*7))
+ }
+ if seconds/(3600*24*30) < 256 {
+ return fmt.Sprintf("%dM", seconds/(3600*24*30))
+ }
+ if seconds/(3600*24*365) < 256 {
+ return fmt.Sprintf("%dy", seconds/(3600*24*365))
+ }
+ return ""
+}
diff --git a/weed/storage/needle/volume_ttl_test.go b/weed/storage/needle/volume_ttl_test.go
index 0afebebf5..150d06e6e 100644
--- a/weed/storage/needle/volume_ttl_test.go
+++ b/weed/storage/needle/volume_ttl_test.go
@@ -30,13 +30,18 @@ func TestTTLReadWrite(t *testing.T) {
t.Errorf("5d ttl:%v", ttl)
}
+ ttl, _ = ReadTTL("50d")
+ if ttl.Minutes() != 50*24*60 {
+ t.Errorf("50d ttl:%v", ttl)
+ }
+
ttl, _ = ReadTTL("5w")
if ttl.Minutes() != 5*7*24*60 {
t.Errorf("5w ttl:%v", ttl)
}
ttl, _ = ReadTTL("5M")
- if ttl.Minutes() != 5*31*24*60 {
+ if ttl.Minutes() != 5*30*24*60 {
t.Errorf("5M ttl:%v", ttl)
}
diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go
index 77d081ea7..d35391f66 100644
--- a/weed/storage/needle_map.go
+++ b/weed/storage/needle_map.go
@@ -1,25 +1,26 @@
package storage
import (
- "fmt"
+ "io"
"os"
"sync"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
)
-type NeedleMapType int
+type NeedleMapKind int
const (
- NeedleMapInMemory NeedleMapType = iota
+ NeedleMapInMemory NeedleMapKind = iota
NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer
NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer
NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer
)
type NeedleMapper interface {
- Put(key NeedleId, offset Offset, size uint32) error
+ Put(key NeedleId, offset Offset, size Size) error
Get(key NeedleId) (element *needle_map.NeedleValue, ok bool)
Delete(key NeedleId, offset Offset) error
Close()
@@ -30,6 +31,8 @@ type NeedleMapper interface {
DeletedCount() int
MaxFileKey() NeedleId
IndexFileSize() uint64
+ Sync() error
+ ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error)
}
type baseNeedleMapper struct {
@@ -37,6 +40,7 @@ type baseNeedleMapper struct {
indexFile *os.File
indexFileAccessLock sync.Mutex
+ indexFileOffset int64
}
func (nm *baseNeedleMapper) IndexFileSize() uint64 {
@@ -47,15 +51,35 @@ func (nm *baseNeedleMapper) IndexFileSize() uint64 {
return 0
}
-func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size uint32) error {
+func (nm *baseNeedleMapper) appendToIndexFile(key NeedleId, offset Offset, size Size) error {
bytes := needle_map.ToBytes(key, offset, size)
nm.indexFileAccessLock.Lock()
defer nm.indexFileAccessLock.Unlock()
- if _, err := nm.indexFile.Seek(0, 2); err != nil {
- return fmt.Errorf("cannot seek end of indexfile %s: %v",
- nm.indexFile.Name(), err)
+ written, err := nm.indexFile.WriteAt(bytes, nm.indexFileOffset)
+ if err == nil {
+ nm.indexFileOffset += int64(written)
}
- _, err := nm.indexFile.Write(bytes)
return err
}
+
+func (nm *baseNeedleMapper) Sync() error {
+ return nm.indexFile.Sync()
+}
+
+func (nm *baseNeedleMapper) ReadIndexEntry(n int64) (key NeedleId, offset Offset, size Size, err error) {
+ bytes := make([]byte, NeedleMapEntrySize)
+ var readCount int
+ if readCount, err = nm.indexFile.ReadAt(bytes, n*NeedleMapEntrySize); err != nil {
+ if err == io.EOF {
+ if readCount == NeedleMapEntrySize {
+ err = nil
+ }
+ }
+ if err != nil {
+ return
+ }
+ }
+ key, offset, size = idx.IdxFileEntry(bytes)
+ return
+}
diff --git a/weed/storage/needle_map/compact_map.go b/weed/storage/needle_map/compact_map.go
index 76783d0b0..2b1a471bc 100644
--- a/weed/storage/needle_map/compact_map.go
+++ b/weed/storage/needle_map/compact_map.go
@@ -18,7 +18,7 @@ const SectionalNeedleIdLimit = 1<<32 - 1
type SectionalNeedleValue struct {
Key SectionalNeedleId
OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
+ Size Size `comment:"Size of the data portion"`
}
type SectionalNeedleValueExtra struct {
@@ -50,7 +50,7 @@ func NewCompactSection(start NeedleId) *CompactSection {
}
//return old entry size
-func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
+func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
cs.Lock()
if key > cs.end {
cs.end = key
@@ -80,7 +80,7 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs
return
}
-func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) {
+func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) {
needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size}
needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: offset.OffsetHigher}
insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
@@ -115,24 +115,21 @@ func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
return cs.overflow[i].Key >= key
})
if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
- for i := deleteCandidate; i < length-1; i++ {
- cs.overflow[i] = cs.overflow[i+1]
- cs.overflowExtra[i] = cs.overflowExtra[i+1]
+ if cs.overflow[deleteCandidate].Size.IsValid() {
+ cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size
}
- cs.overflow = cs.overflow[0 : length-1]
- cs.overflowExtra = cs.overflowExtra[0 : length-1]
}
}
//return old entry size
-func (cs *CompactSection) Delete(key NeedleId) uint32 {
+func (cs *CompactSection) Delete(key NeedleId) Size {
skey := SectionalNeedleId(key - cs.start)
cs.Lock()
- ret := uint32(0)
+ ret := Size(0)
if i := cs.binarySearchValues(skey); i >= 0 {
- if cs.values[i].Size > 0 && cs.values[i].Size != TombstoneFileSize {
+ if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
ret = cs.values[i].Size
- cs.values[i].Size = TombstoneFileSize
+ cs.values[i].Size = -cs.values[i].Size
}
}
if _, v, found := cs.findOverflowEntry(skey); found {
@@ -181,7 +178,7 @@ func NewCompactMap() *CompactMap {
return &CompactMap{}
}
-func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32) {
+func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
x := cm.binarySearchCompactSection(key)
if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
// println(x, "adding to existing", len(cm.list), "sections, starting", key)
@@ -204,10 +201,10 @@ func (cm *CompactMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset O
// println(key, "set to section[", x, "].start", cm.list[x].start)
return cm.list[x].Set(key, offset, size)
}
-func (cm *CompactMap) Delete(key NeedleId) uint32 {
+func (cm *CompactMap) Delete(key NeedleId) Size {
x := cm.binarySearchCompactSection(key)
if x < 0 {
- return uint32(0)
+ return Size(0)
}
return cm.list[x].Delete(key)
}
diff --git a/weed/storage/needle_map/compact_map_perf_test.go b/weed/storage/needle_map/compact_map_perf_test.go
index 3a3648641..081fb34e9 100644
--- a/weed/storage/needle_map/compact_map_perf_test.go
+++ b/weed/storage/needle_map/compact_map_perf_test.go
@@ -9,7 +9,6 @@ import (
"time"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
/*
@@ -32,7 +31,7 @@ func TestMemoryUsage(t *testing.T) {
startTime := time.Now()
for i := 0; i < 10; i++ {
- indexFile, ie := os.OpenFile("../../../test/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
+ indexFile, ie := os.OpenFile("../../../test/data/sample.idx", os.O_RDWR|os.O_RDONLY, 0644)
if ie != nil {
log.Fatalln(ie)
}
@@ -60,7 +59,7 @@ func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) {
rowCount++
key := BytesToNeedleId(bytes[i : i+NeedleIdSize])
offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize])
- size := util.BytesToUint32(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize])
+ size := BytesToSize(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize])
if !offset.IsZero() {
m.Set(NeedleId(key), offset, size)
diff --git a/weed/storage/needle_map/compact_map_test.go b/weed/storage/needle_map/compact_map_test.go
index 7eea3969a..199cb26b3 100644
--- a/weed/storage/needle_map/compact_map_test.go
+++ b/weed/storage/needle_map/compact_map_test.go
@@ -49,7 +49,7 @@ func TestIssue52(t *testing.T) {
func TestCompactMap(t *testing.T) {
m := NewCompactMap()
for i := uint32(0); i < 100*batch; i += 2 {
- m.Set(NeedleId(i), ToOffset(int64(i)), i)
+ m.Set(NeedleId(i), ToOffset(int64(i)), Size(i))
}
for i := uint32(0); i < 100*batch; i += 37 {
@@ -57,7 +57,7 @@ func TestCompactMap(t *testing.T) {
}
for i := uint32(0); i < 10*batch; i += 3 {
- m.Set(NeedleId(i), ToOffset(int64(i+11)), i+5)
+ m.Set(NeedleId(i), ToOffset(int64(i+11)), Size(i+5))
}
// for i := uint32(0); i < 100; i++ {
@@ -72,15 +72,15 @@ func TestCompactMap(t *testing.T) {
if !ok {
t.Fatal("key", i, "missing!")
}
- if v.Size != i+5 {
+ if v.Size != Size(i+5) {
t.Fatal("key", i, "size", v.Size)
}
} else if i%37 == 0 {
- if ok && v.Size != TombstoneFileSize {
+ if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 {
- if v.Size != i {
+ if v.Size != Size(i) {
t.Fatal("key", i, "size", v.Size)
}
}
@@ -89,14 +89,14 @@ func TestCompactMap(t *testing.T) {
for i := uint32(10 * batch); i < 100*batch; i++ {
v, ok := m.Get(NeedleId(i))
if i%37 == 0 {
- if ok && v.Size != TombstoneFileSize {
+ if ok && v.Size.IsValid() {
t.Fatal("key", i, "should have been deleted needle value", v)
}
} else if i%2 == 0 {
if v == nil {
t.Fatal("key", i, "missing")
}
- if v.Size != i {
+ if v.Size != Size(i) {
t.Fatal("key", i, "size", v.Size)
}
}
@@ -129,8 +129,8 @@ func TestOverflow(t *testing.T) {
cs.deleteOverflowEntry(4)
- if len(cs.overflow) != 4 {
- t.Fatalf("expecting 4 entries now: %+v", cs.overflow)
+ if len(cs.overflow) != 5 {
+ t.Fatalf("expecting 5 entries now: %+v", cs.overflow)
}
_, x, _ := cs.findOverflowEntry(5)
@@ -146,7 +146,7 @@ func TestOverflow(t *testing.T) {
cs.deleteOverflowEntry(1)
for i, x := range cs.overflow {
- println("overflow[", i, "]:", x.Key)
+ println("overflow[", i, "]:", x.Key, "size", x.Size)
}
println()
diff --git a/weed/storage/needle_map/memdb.go b/weed/storage/needle_map/memdb.go
index 6aba6adeb..ba1fd3d1e 100644
--- a/weed/storage/needle_map/memdb.go
+++ b/weed/storage/needle_map/memdb.go
@@ -2,6 +2,7 @@ package needle_map
import (
"fmt"
+ "io"
"os"
"github.com/syndtr/goleveldb/leveldb"
@@ -11,7 +12,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
)
//This map uses in memory level db
@@ -32,7 +32,7 @@ func NewMemDb() *MemDb {
return t
}
-func (cm *MemDb) Set(key NeedleId, offset Offset, size uint32) error {
+func (cm *MemDb) Set(key NeedleId, offset Offset, size Size) error {
bytes := ToBytes(key, offset, size)
@@ -56,7 +56,7 @@ func (cm *MemDb) Get(key NeedleId) (*NeedleValue, bool) {
return nil, false
}
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
return &NeedleValue{Key: key, Offset: offset, Size: size}, true
}
@@ -67,7 +67,7 @@ func (cm *MemDb) AscendingVisit(visit func(NeedleValue) error) (ret error) {
key := BytesToNeedleId(iter.Key())
data := iter.Value()
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
needle := NeedleValue{Key: key, Offset: offset, Size: size}
ret = visit(needle)
@@ -89,6 +89,9 @@ func (cm *MemDb) SaveToIdx(idxName string) (ret error) {
defer idxFile.Close()
return cm.AscendingVisit(func(value NeedleValue) error {
+ if value.Offset.IsZero() || value.Size.IsDeleted() {
+ return nil
+ }
_, err := idxFile.Write(value.ToBytes())
return err
})
@@ -102,11 +105,21 @@ func (cm *MemDb) LoadFromIdx(idxName string) (ret error) {
}
defer idxFile.Close()
- return idx.WalkIndexFile(idxFile, func(key NeedleId, offset Offset, size uint32) error {
- if offset.IsZero() || size == TombstoneFileSize {
- return nil
+ return cm.LoadFromReaderAt(idxFile)
+
+}
+
+func (cm *MemDb) LoadFromReaderAt(readerAt io.ReaderAt) (ret error) {
+
+ return idx.WalkIndexFile(readerAt, func(key NeedleId, offset Offset, size Size) error {
+ if offset.IsZero() || size.IsDeleted() {
+ return cm.Delete(key)
}
return cm.Set(key, offset, size)
})
}
+
+func (cm *MemDb) Close() {
+ cm.db.Close()
+}
diff --git a/weed/storage/needle_map/memdb_test.go b/weed/storage/needle_map/memdb_test.go
new file mode 100644
index 000000000..7b45d23f8
--- /dev/null
+++ b/weed/storage/needle_map/memdb_test.go
@@ -0,0 +1,23 @@
+package needle_map
+
+import (
+ "testing"
+
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func BenchmarkMemDb(b *testing.B) {
+ b.ReportAllocs()
+ for i := 0; i < b.N; i++ {
+ nm := NewMemDb()
+
+ nid := types.NeedleId(345)
+ offset := types.Offset{
+ OffsetHigher: types.OffsetHigher{},
+ OffsetLower: types.OffsetLower{},
+ }
+ nm.Set(nid, offset, 324)
+ nm.Close()
+ }
+
+}
diff --git a/weed/storage/needle_map/needle_value.go b/weed/storage/needle_map/needle_value.go
index ef540b55e..f8d614660 100644
--- a/weed/storage/needle_map/needle_value.go
+++ b/weed/storage/needle_map/needle_value.go
@@ -9,7 +9,7 @@ import (
type NeedleValue struct {
Key NeedleId
Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
- Size uint32 `comment:"Size of the data portion"`
+ Size Size `comment:"Size of the data portion"`
}
func (this NeedleValue) Less(than btree.Item) bool {
@@ -21,10 +21,10 @@ func (nv NeedleValue) ToBytes() []byte {
return ToBytes(nv.Key, nv.Offset, nv.Size)
}
-func ToBytes(key NeedleId, offset Offset, size uint32) []byte {
+func ToBytes(key NeedleId, offset Offset, size Size) []byte {
bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize)
NeedleIdToBytes(bytes[0:NeedleIdSize], key)
OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset)
- util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size)
+ util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], uint32(size))
return bytes
}
diff --git a/weed/storage/needle_map/needle_value_map.go b/weed/storage/needle_map/needle_value_map.go
index 0a5a00ef7..a30cb96c4 100644
--- a/weed/storage/needle_map/needle_value_map.go
+++ b/weed/storage/needle_map/needle_value_map.go
@@ -5,8 +5,8 @@ import (
)
type NeedleValueMap interface {
- Set(key NeedleId, offset Offset, size uint32) (oldOffset Offset, oldSize uint32)
- Delete(key NeedleId) uint32
+ Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size)
+ Delete(key NeedleId) Size
Get(key NeedleId) (*NeedleValue, bool)
AscendingVisit(visit func(NeedleValue) error) error
}
diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go
index ef8571e83..31c86d124 100644
--- a/weed/storage/needle_map_leveldb.go
+++ b/weed/storage/needle_map_leveldb.go
@@ -5,14 +5,16 @@ import (
"os"
"path/filepath"
- "github.com/chrislusf/seaweedfs/weed/storage/idx"
+ "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
+ "github.com/chrislusf/seaweedfs/weed/storage/idx"
+
+ "github.com/syndtr/goleveldb/leveldb"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle_map"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "github.com/chrislusf/seaweedfs/weed/util"
- "github.com/syndtr/goleveldb/leveldb"
)
type LevelDbNeedleMap struct {
@@ -25,14 +27,24 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Option
m = &LevelDbNeedleMap{dbFileName: dbFileName}
m.indexFile = indexFile
if !isLevelDbFresh(dbFileName, indexFile) {
- glog.V(0).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
+ glog.V(1).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name())
generateLevelDbFile(dbFileName, indexFile)
- glog.V(0).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ glog.V(1).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name())
+ }
+ if stat, err := indexFile.Stat(); err != nil {
+ glog.Fatalf("stat file %s: %v", indexFile.Name(), err)
+ } else {
+ m.indexFileOffset = stat.Size()
}
glog.V(1).Infof("Opening %s...", dbFileName)
if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil {
- return
+ if errors.IsCorrupted(err) {
+ m.db, err = leveldb.RecoverFile(dbFileName, opts)
+ }
+ if err != nil {
+ return
+ }
}
glog.V(1).Infof("Loading %s...", indexFile.Name())
mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile)
@@ -66,8 +78,8 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error {
return err
}
defer db.Close()
- return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error {
- if !offset.IsZero() && size != TombstoneFileSize {
+ return idx.WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size Size) error {
+ if !offset.IsZero() && size.IsValid() {
levelDbWrite(db, key, offset, size)
} else {
levelDbDelete(db, key)
@@ -84,12 +96,12 @@ func (m *LevelDbNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue, o
return nil, false
}
offset := BytesToOffset(data[0:OffsetSize])
- size := util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize])
+ size := BytesToSize(data[OffsetSize : OffsetSize+SizeSize])
return &needle_map.NeedleValue{Key: key, Offset: offset, Size: size}, true
}
-func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
- var oldSize uint32
+func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
+ var oldSize Size
if oldNeedle, ok := m.Get(key); ok {
oldSize = oldNeedle.Size
}
@@ -101,7 +113,7 @@ func (m *LevelDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
return levelDbWrite(m.db, key, offset, size)
}
-func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size uint32) error {
+func levelDbWrite(db *leveldb.DB, key NeedleId, offset Offset, size Size) error {
bytes := needle_map.ToBytes(key, offset, size)
@@ -117,19 +129,34 @@ func levelDbDelete(db *leveldb.DB, key NeedleId) error {
}
func (m *LevelDbNeedleMap) Delete(key NeedleId, offset Offset) error {
- if oldNeedle, ok := m.Get(key); ok {
- m.logDelete(oldNeedle.Size)
+ oldNeedle, found := m.Get(key)
+ if !found || oldNeedle.Size.IsDeleted() {
+ return nil
}
+ m.logDelete(oldNeedle.Size)
+
// write to index file first
if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil {
return err
}
- return levelDbDelete(m.db, key)
+
+ return levelDbWrite(m.db, key, oldNeedle.Offset, -oldNeedle.Size)
}
func (m *LevelDbNeedleMap) Close() {
- m.indexFile.Close()
- m.db.Close()
+ indexFileName := m.indexFile.Name()
+ if err := m.indexFile.Sync(); err != nil {
+ glog.Warningf("sync file %s failed: %v", indexFileName, err)
+ }
+ if err := m.indexFile.Close(); err != nil {
+ glog.Warningf("close index file %s failed: %v", indexFileName, err)
+ }
+
+ if m.db != nil {
+ if err := m.db.Close(); err != nil {
+ glog.Warningf("close levelDB failed: %v", err)
+ }
+ }
}
func (m *LevelDbNeedleMap) Destroy() error {
diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go
index 37dee7889..1b58708c6 100644
--- a/weed/storage/needle_map_memory.go
+++ b/weed/storage/needle_map_memory.go
@@ -19,6 +19,11 @@ func NewCompactNeedleMap(file *os.File) *NeedleMap {
m: needle_map.NewCompactMap(),
}
nm.indexFile = file
+ stat, err := file.Stat()
+ if err != nil {
+ glog.Fatalf("stat file %s: %v", file.Name(), err)
+ }
+ nm.indexFileOffset = stat.Size()
return nm
}
@@ -28,13 +33,13 @@ func LoadCompactNeedleMap(file *os.File) (*NeedleMap, error) {
}
func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
- e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size uint32) error {
+ e := idx.WalkIndexFile(file, func(key NeedleId, offset Offset, size Size) error {
nm.MaybeSetMaxFileKey(key)
- if !offset.IsZero() && size != TombstoneFileSize {
+ if !offset.IsZero() && size.IsValid() {
nm.FileCounter++
nm.FileByteCounter = nm.FileByteCounter + uint64(size)
oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size)
- if !oldOffset.IsZero() && oldSize != TombstoneFileSize {
+ if !oldOffset.IsZero() && oldSize.IsValid() {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
@@ -49,7 +54,7 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) {
return nm, e
}
-func (nm *NeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+func (nm *NeedleMap) Put(key NeedleId, offset Offset, size Size) error {
_, oldSize := nm.m.Set(NeedleId(key), offset, size)
nm.logPut(key, oldSize, size)
return nm.appendToIndexFile(key, offset, size)
@@ -64,6 +69,10 @@ func (nm *NeedleMap) Delete(key NeedleId, offset Offset) error {
return nm.appendToIndexFile(key, offset, TombstoneFileSize)
}
func (nm *NeedleMap) Close() {
+ indexFileName := nm.indexFile.Name()
+ if err := nm.indexFile.Sync(); err != nil {
+ glog.Warningf("sync file %s failed, %v", indexFileName, err)
+ }
_ = nm.indexFile.Close()
}
func (nm *NeedleMap) Destroy() error {
diff --git a/weed/storage/needle_map_metric.go b/weed/storage/needle_map_metric.go
index 823a04108..3618dada9 100644
--- a/weed/storage/needle_map_metric.go
+++ b/weed/storage/needle_map_metric.go
@@ -18,31 +18,31 @@ type mapMetric struct {
MaximumFileKey uint64 `json:"MaxFileKey"`
}
-func (mm *mapMetric) logDelete(deletedByteCount uint32) {
+func (mm *mapMetric) logDelete(deletedByteCount Size) {
if mm == nil {
return
}
mm.LogDeletionCounter(deletedByteCount)
}
-func (mm *mapMetric) logPut(key NeedleId, oldSize uint32, newSize uint32) {
+func (mm *mapMetric) logPut(key NeedleId, oldSize Size, newSize Size) {
if mm == nil {
return
}
mm.MaybeSetMaxFileKey(key)
mm.LogFileCounter(newSize)
- if oldSize > 0 && oldSize != TombstoneFileSize {
+ if oldSize > 0 && oldSize.IsValid() {
mm.LogDeletionCounter(oldSize)
}
}
-func (mm *mapMetric) LogFileCounter(newSize uint32) {
+func (mm *mapMetric) LogFileCounter(newSize Size) {
if mm == nil {
return
}
atomic.AddUint32(&mm.FileCounter, 1)
atomic.AddUint64(&mm.FileByteCounter, uint64(newSize))
}
-func (mm *mapMetric) LogDeletionCounter(oldSize uint32) {
+func (mm *mapMetric) LogDeletionCounter(oldSize Size) {
if mm == nil {
return
}
@@ -97,11 +97,11 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
buf := make([]byte, NeedleIdSize)
err = reverseWalkIndexFile(r, func(entryCount int64) {
bf = bloom.NewWithEstimates(uint(entryCount), 0.001)
- }, func(key NeedleId, offset Offset, size uint32) error {
+ }, func(key NeedleId, offset Offset, size Size) error {
mm.MaybeSetMaxFileKey(key)
NeedleIdToBytes(buf, key)
- if size != TombstoneFileSize {
+ if size.IsValid() {
mm.FileByteCounter += uint64(size)
}
@@ -111,7 +111,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
} else {
// deleted file
mm.DeletionCounter++
- if size != TombstoneFileSize {
+ if size.IsValid() {
// previously already deleted file
mm.DeletionByteCounter += uint64(size)
}
@@ -121,7 +121,7 @@ func newNeedleMapMetricFromIndexFile(r *os.File) (mm *mapMetric, err error) {
return
}
-func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size uint32) error) error {
+func reverseWalkIndexFile(r *os.File, initFn func(entryCount int64), fn func(key NeedleId, offset Offset, size Size) error) error {
fi, err := r.Stat()
if err != nil {
return fmt.Errorf("file %s stat error: %v", r.Name(), err)
diff --git a/weed/storage/needle_map_metric_test.go b/weed/storage/needle_map_metric_test.go
index ae2177a30..362659a11 100644
--- a/weed/storage/needle_map_metric_test.go
+++ b/weed/storage/needle_map_metric_test.go
@@ -15,7 +15,7 @@ func TestFastLoadingNeedleMapMetrics(t *testing.T) {
nm := NewCompactNeedleMap(idxFile)
for i := 0; i < 10000; i++ {
- nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), uint32(1))
+ nm.Put(Uint64ToNeedleId(uint64(i+1)), Uint32ToOffset(uint32(0)), Size(1))
if rand.Float32() < 0.2 {
nm.Delete(Uint64ToNeedleId(uint64(rand.Int63n(int64(i))+1)), Uint32ToOffset(uint32(0)))
}
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index e6f9258f3..662b90531 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -16,18 +16,18 @@ type SortedFileNeedleMap struct {
dbFileSize int64
}
-func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
- m = &SortedFileNeedleMap{baseFileName: baseFileName}
+func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
+ m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
m.indexFile = indexFile
- fileName := baseFileName + ".sdx"
+ fileName := indexBaseFileName + ".sdx"
if !isSortedFileFresh(fileName, indexFile) {
glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
- erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx")
+ erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx")
glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
}
glog.V(1).Infof("Opening %s...", fileName)
- if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil {
+ if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil {
return
}
dbStat, _ := m.dbFile.Stat()
@@ -65,7 +65,7 @@ func (m *SortedFileNeedleMap) Get(key NeedleId) (element *needle_map.NeedleValue
}
-func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size uint32) error {
+func (m *SortedFileNeedleMap) Put(key NeedleId, offset Offset, size Size) error {
return os.ErrInvalid
}
@@ -80,7 +80,7 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
return err
}
- if size == TombstoneFileSize {
+ if size.IsDeleted() {
return nil
}
@@ -94,8 +94,12 @@ func (m *SortedFileNeedleMap) Delete(key NeedleId, offset Offset) error {
}
func (m *SortedFileNeedleMap) Close() {
- m.indexFile.Close()
- m.dbFile.Close()
+ if m.indexFile != nil {
+ m.indexFile.Close()
+ }
+ if m.dbFile != nil {
+ m.dbFile.Close()
+ }
}
func (m *SortedFileNeedleMap) Destroy() error {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 512f72ceb..6be15a4c9 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,13 +2,17 @@ package storage
import (
"fmt"
+ "path/filepath"
+ "strings"
"sync/atomic"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
@@ -18,21 +22,25 @@ const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
)
+type ReadOption struct {
+ ReadDeleted bool
+}
+
/*
* A VolumeServer contains one Store
*/
type Store struct {
MasterAddress string
grpcDialOption grpc.DialOption
- volumeSizeLimit uint64 //read from the master
+ volumeSizeLimit uint64 // read from the master
Ip string
Port int
PublicUrl string
Locations []*DiskLocation
- dataCenter string //optional informaton, overwriting master setting if exists
- rack string //optional information, overwriting master setting if exists
+ dataCenter string // optional informaton, overwriting master setting if exists
+ rack string // optional information, overwriting master setting if exists
connected bool
- NeedleMapType NeedleMapType
+ NeedleMapKind NeedleMapKind
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
@@ -44,11 +52,11 @@ func (s *Store) String() (str string) {
return
}
-func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, needleMapKind NeedleMapType) (s *Store) {
- s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapType: needleMapKind}
+func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int, minFreeSpacePercents []float32, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType) (s *Store) {
+ s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, PublicUrl: publicUrl, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
for i := 0; i < len(dirnames); i++ {
- location := NewDiskLocation(dirnames[i], maxVolumeCounts[i])
+ location := NewDiskLocation(dirnames[i], maxVolumeCounts[i], minFreeSpacePercents[i], idxFolder, diskTypes[i])
location.loadExistingVolumes(needleMapKind)
s.Locations = append(s.Locations, location)
stats.VolumeServerMaxVolumeCounter.Add(float64(maxVolumeCounts[i]))
@@ -61,7 +69,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
-func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
+func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -70,7 +78,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
- e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb)
+ e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@@ -92,10 +100,19 @@ func (s *Store) findVolume(vid needle.VolumeId) *Volume {
}
return nil
}
-func (s *Store) FindFreeLocation() (ret *DiskLocation) {
+func (s *Store) FindFreeLocation(diskType DiskType) (ret *DiskLocation) {
max := 0
for _, location := range s.Locations {
+ if diskType != location.DiskType {
+ continue
+ }
+ if location.isDiskSpaceLow {
+ continue
+ }
currentFreeCount := location.MaxVolumeCount - location.VolumesLen()
+ currentFreeCount *= erasure_coding.DataShardsCount
+ currentFreeCount -= location.EcVolumesLen()
+ currentFreeCount /= erasure_coding.DataShardsCount
if currentFreeCount > max {
max = currentFreeCount
ret = location
@@ -103,14 +120,14 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
- if location := s.FindFreeLocation(); location != nil {
+ if location := s.FindFreeLocation(diskType); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
+ if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
@@ -119,6 +136,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
ReplicaPlacement: uint32(replicaPlacement.Byte()),
Version: uint32(volume.Version()),
Ttl: ttl.ToUint32(),
+ DiskType: string(diskType),
}
return nil
} else {
@@ -128,64 +146,130 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("No more free space left")
}
-func (s *Store) VolumeInfos() []*VolumeInfo {
- var stats []*VolumeInfo
+func (s *Store) VolumeInfos() (allStats []*VolumeInfo) {
for _, location := range s.Locations {
- location.volumesLock.RLock()
- for k, v := range location.volumes {
- s := &VolumeInfo{
- Id: needle.VolumeId(k),
- Size: v.ContentSize(),
- Collection: v.Collection,
- ReplicaPlacement: v.ReplicaPlacement,
- Version: v.Version(),
- FileCount: int(v.FileCount()),
- DeleteCount: int(v.DeletedCount()),
- DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
- Ttl: v.Ttl,
- CompactRevision: uint32(v.CompactionRevision),
- }
- s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
- stats = append(stats, s)
- }
- location.volumesLock.RUnlock()
+ stats := collectStatsForOneLocation(location)
+ allStats = append(allStats, stats...)
+ }
+ sortVolumeInfos(allStats)
+ return allStats
+}
+
+func collectStatsForOneLocation(location *DiskLocation) (stats []*VolumeInfo) {
+ location.volumesLock.RLock()
+ defer location.volumesLock.RUnlock()
+
+ for k, v := range location.volumes {
+ s := collectStatForOneVolume(k, v)
+ stats = append(stats, s)
}
- sortVolumeInfos(stats)
return stats
}
+func collectStatForOneVolume(vid needle.VolumeId, v *Volume) (s *VolumeInfo) {
+
+ s = &VolumeInfo{
+ Id: vid,
+ Collection: v.Collection,
+ ReplicaPlacement: v.ReplicaPlacement,
+ Version: v.Version(),
+ ReadOnly: v.IsReadOnly(),
+ Ttl: v.Ttl,
+ CompactRevision: uint32(v.CompactionRevision),
+ DiskType: v.DiskType().String(),
+ }
+ s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
+
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ if v.nm == nil {
+ return
+ }
+
+ s.FileCount = v.nm.FileCount()
+ s.DeleteCount = v.nm.DeletedCount()
+ s.DeletedByteCount = v.nm.DeletedSize()
+ s.Size = v.nm.ContentSize()
+
+ return
+}
+
func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter
}
func (s *Store) SetRack(rack string) {
s.rack = rack
}
+func (s *Store) GetDataCenter() string {
+ return s.dataCenter
+}
+func (s *Store) GetRack() string {
+ return s.rack
+}
func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var volumeMessages []*master_pb.VolumeInformationMessage
- maxVolumeCount := 0
+ maxVolumeCounts := make(map[string]uint32)
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
+ collectionVolumeReadOnlyCount := make(map[string]map[string]uint8)
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
- maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
+ maxVolumeCounts[string(location.DiskType)] += uint32(location.MaxVolumeCount)
location.volumesLock.RLock()
for _, v := range location.volumes {
- if maxFileKey < v.MaxFileKey() {
- maxFileKey = v.MaxFileKey()
+ curMaxFileKey, volumeMessage := v.ToVolumeInformationMessage()
+ if volumeMessage == nil {
+ continue
+ }
+ if maxFileKey < curMaxFileKey {
+ maxFileKey = curMaxFileKey
}
- if !v.expired(s.GetVolumeSizeLimit()) {
- volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
+ deleteVolume := false
+ if !v.expired(volumeMessage.Size, s.GetVolumeSizeLimit()) {
+ volumeMessages = append(volumeMessages, volumeMessage)
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
} else {
- glog.V(0).Infoln("volume", v.Id, "is expired.")
+ glog.V(0).Infof("volume %d is expired", v.Id)
+ }
+ if v.lastIoError != nil {
+ deleteVids = append(deleteVids, v.Id)
+ deleteVolume = true
+ glog.Warningf("volume %d has IO error: %v", v.Id, v.lastIoError)
+ }
+ }
+
+ if _, exist := collectionVolumeSize[v.Collection]; !exist {
+ collectionVolumeSize[v.Collection] = 0
+ }
+ if !deleteVolume {
+ collectionVolumeSize[v.Collection] += volumeMessage.Size
+ }
+
+ if _, exist := collectionVolumeReadOnlyCount[v.Collection]; !exist {
+ collectionVolumeReadOnlyCount[v.Collection] = map[string]uint8{
+ "IsReadOnly": 0,
+ "noWriteOrDelete": 0,
+ "noWriteCanDelete": 0,
+ "isDiskSpaceLow": 0,
+ }
+ }
+ if !deleteVolume && v.IsReadOnly() {
+ collectionVolumeReadOnlyCount[v.Collection]["IsReadOnly"] += 1
+ if v.noWriteOrDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteOrDelete"] += 1
+ }
+ if v.noWriteCanDelete {
+ collectionVolumeReadOnlyCount[v.Collection]["noWriteCanDelete"] += 1
+ }
+ if v.location.isDiskSpaceLow {
+ collectionVolumeReadOnlyCount[v.Collection]["isDiskSpaceLow"] += 1
}
}
- fileSize, _, _ := v.FileStat()
- collectionVolumeSize[v.Collection] += fileSize
}
location.volumesLock.RUnlock()
@@ -193,8 +277,14 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
// delete expired volumes.
location.volumesLock.Lock()
for _, vid := range deleteVids {
- location.deleteVolumeById(vid)
- glog.V(0).Infoln("volume", vid, "is deleted.")
+ found, err := location.deleteVolumeById(vid)
+ if err == nil {
+ if found {
+ glog.V(0).Infof("volume %d is deleted", vid)
+ }
+ } else {
+ glog.Warningf("delete volume %d: %v", vid, err)
+ }
}
location.volumesLock.Unlock()
}
@@ -204,16 +294,22 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
stats.VolumeServerDiskSizeGauge.WithLabelValues(col, "normal").Set(float64(size))
}
+ for col, types := range collectionVolumeReadOnlyCount {
+ for t, count := range types {
+ stats.VolumeServerReadOnlyVolumeGauge.WithLabelValues(col, t).Set(float64(count))
+ }
+ }
+
return &master_pb.Heartbeat{
- Ip: s.Ip,
- Port: uint32(s.Port),
- PublicUrl: s.PublicUrl,
- MaxVolumeCount: uint32(maxVolumeCount),
- MaxFileKey: NeedleIdToUint64(maxFileKey),
- DataCenter: s.dataCenter,
- Rack: s.rack,
- Volumes: volumeMessages,
- HasNoVolumes: len(volumeMessages) == 0,
+ Ip: s.Ip,
+ Port: uint32(s.Port),
+ PublicUrl: s.PublicUrl,
+ MaxVolumeCounts: maxVolumeCounts,
+ MaxFileKey: NeedleIdToUint64(maxFileKey),
+ DataCenter: s.dataCenter,
+ Rack: s.rack,
+ Volumes: volumeMessages,
+ HasNoVolumes: len(volumeMessages) == 0,
}
}
@@ -224,17 +320,13 @@ func (s *Store) Close() {
}
}
-func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
+func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle, fsync bool) (isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
- if v.noWriteOrDelete || v.noWriteCanDelete {
+ if v.IsReadOnly() {
err = fmt.Errorf("volume %d is read only", i)
return
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
- _, size, isUnchanged, err = v.writeNeedle(n)
- } else {
- err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
- }
+ _, _, isUnchanged, err = v.writeNeedle2(n, fsync)
return
}
glog.V(0).Infoln("volume", i, "not found!")
@@ -242,23 +334,19 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
return
}
-func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
+func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (Size, error) {
if v := s.findVolume(i); v != nil {
if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
- return v.deleteNeedle(n)
- } else {
- return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
- }
+ return v.deleteNeedle2(n)
}
return 0, fmt.Errorf("volume %d not found on %s:%d", i, s.Ip, s.Port)
}
-func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadVolumeNeedle(i needle.VolumeId, n *needle.Needle, readOption *ReadOption) (int, error) {
if v := s.findVolume(i); v != nil {
- return v.readNeedle(n)
+ return v.readNeedle(n, readOption)
}
return 0, fmt.Errorf("volume %d not found", i)
}
@@ -276,13 +364,26 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
+ v.noWriteLock.Lock()
v.noWriteOrDelete = true
+ v.noWriteLock.Unlock()
+ return nil
+}
+
+func (s *Store) MarkVolumeWritable(i needle.VolumeId) error {
+ v := s.findVolume(i)
+ if v == nil {
+ return fmt.Errorf("volume %d not found", i)
+ }
+ v.noWriteLock.Lock()
+ v.noWriteOrDelete = false
+ v.noWriteLock.Unlock()
return nil
}
func (s *Store) MountVolume(i needle.VolumeId) error {
for _, location := range s.Locations {
- if found := location.LoadVolume(i, s.NeedleMapType); found == true {
+ if found := location.LoadVolume(i, s.NeedleMapKind); found == true {
glog.V(0).Infof("mount volume %d", i)
v := s.findVolume(i)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
@@ -291,6 +392,7 @@ func (s *Store) MountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
return nil
}
@@ -310,6 +412,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
@@ -326,7 +429,7 @@ func (s *Store) UnmountVolume(i needle.VolumeId) error {
func (s *Store) DeleteVolume(i needle.VolumeId) error {
v := s.findVolume(i)
if v == nil {
- return nil
+ return fmt.Errorf("delete volume %d not found on disk", i)
}
message := master_pb.VolumeShortInformationMessage{
Id: uint32(v.Id),
@@ -334,18 +437,46 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
+ DiskType: string(v.location.DiskType),
}
for _, location := range s.Locations {
- if error := location.deleteVolumeById(i); error == nil {
+ if err := location.DeleteVolume(i); err == nil {
glog.V(0).Infof("DeleteVolume %d", i)
s.DeletedVolumesChan <- message
return nil
+ } else {
+ glog.Errorf("DeleteVolume %d: %v", i, err)
}
}
return fmt.Errorf("volume %d not found on disk", i)
}
+func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
+
+ for _, location := range s.Locations {
+ fileInfo, found := location.LocateVolume(i)
+ if !found {
+ continue
+ }
+ // load, modify, save
+ baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
+ vifFile := filepath.Join(location.Directory, baseFileName+".vif")
+ volumeInfo, _, _, err := pb.MaybeLoadVolumeInfo(vifFile)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to load vif", i)
+ }
+ volumeInfo.Replication = replication
+ err = pb.SaveVolumeInfo(vifFile, volumeInfo)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to save vif", i)
+ }
+ return nil
+ }
+
+ return fmt.Errorf("volume %d not found on disk", i)
+}
+
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}
@@ -353,3 +484,28 @@ func (s *Store) SetVolumeSizeLimit(x uint64) {
func (s *Store) GetVolumeSizeLimit() uint64 {
return atomic.LoadUint64(&s.volumeSizeLimit)
}
+
+func (s *Store) MaybeAdjustVolumeMax() (hasChanges bool) {
+ volumeSizeLimit := s.GetVolumeSizeLimit()
+ if volumeSizeLimit == 0 {
+ return
+ }
+ for _, diskLocation := range s.Locations {
+ if diskLocation.OriginalMaxVolumeCount == 0 {
+ currentMaxVolumeCount := diskLocation.MaxVolumeCount
+ diskStatus := stats.NewDiskStatus(diskLocation.Directory)
+ unusedSpace := diskLocation.UnUsedSpace(volumeSizeLimit)
+ unclaimedSpaces := int64(diskStatus.Free) - int64(unusedSpace)
+ volCount := diskLocation.VolumesLen()
+ maxVolumeCount := volCount
+ if unclaimedSpaces > int64(volumeSizeLimit) {
+ maxVolumeCount += int(uint64(unclaimedSpaces)/volumeSizeLimit) - 1
+ }
+ diskLocation.MaxVolumeCount = maxVolumeCount
+ glog.V(2).Infof("disk %s max %d unclaimedSpace:%dMB, unused:%dMB volumeSizeLimit:%dMB",
+ diskLocation.Directory, maxVolumeCount, unclaimedSpaces/1024/1024, unusedSpace/1024/1024, volumeSizeLimit/1024/1024)
+ hasChanges = hasChanges || currentMaxVolumeCount != diskLocation.MaxVolumeCount
+ }
+ }
+ return
+}
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 27406451f..9702fdd50 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "os"
"sort"
"sync"
"time"
@@ -57,8 +58,11 @@ func (s *Store) MountEcShards(collection string, vid needle.VolumeId, shardId er
Id: uint32(vid),
Collection: collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ DiskType: string(location.DiskType),
}
return nil
+ } else if err == os.ErrNotExist {
+ continue
} else {
return fmt.Errorf("%s load ec shard %d.%d: %v", location.Directory, vid, shardId, err)
}
@@ -79,6 +83,7 @@ func (s *Store) UnmountEcShards(vid needle.VolumeId, shardId erasure_coding.Shar
Id: uint32(vid),
Collection: ecShard.Collection,
EcIndexBits: uint32(shardBits.AddShardId(shardId)),
+ DiskType: string(ecShard.DiskType),
}
for _, location := range s.Locations {
@@ -116,7 +121,7 @@ func (s *Store) DestroyEcVolume(vid needle.VolumeId) {
}
}
-func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
+func (s *Store) ReadEcShardNeedle(vid needle.VolumeId, n *needle.Needle) (int, error) {
for _, location := range s.Locations {
if localEcVolume, found := location.FindEcVolume(vid); found {
@@ -124,24 +129,24 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
if err != nil {
return 0, fmt.Errorf("locate in local ec volume: %v", err)
}
- if size == types.TombstoneFileSize {
- return 0, fmt.Errorf("entry %s is deleted", n.Id)
+ if size.IsDeleted() {
+ return 0, ErrorDeleted
}
- glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToAcutalOffset(), size, intervals)
+ glog.V(3).Infof("read ec volume %d offset %d size %d intervals:%+v", vid, offset.ToActualOffset(), size, intervals)
if len(intervals) > 1 {
glog.V(3).Infof("ReadEcShardNeedle needle id %s intervals:%+v", n.String(), intervals)
}
- bytes, isDeleted, err := s.readEcShardIntervals(ctx, vid, n.Id, localEcVolume, intervals)
+ bytes, isDeleted, err := s.readEcShardIntervals(vid, n.Id, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
if isDeleted {
- return 0, fmt.Errorf("ec entry %s is deleted", n.Id)
+ return 0, ErrorDeleted
}
- err = n.ReadBytes(bytes, offset.ToAcutalOffset(), size, localEcVolume.Version)
+ err = n.ReadBytes(bytes, offset.ToActualOffset(), size, localEcVolume.Version)
if err != nil {
return 0, fmt.Errorf("readbytes: %v", err)
}
@@ -152,14 +157,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec shard %d not found", vid)
}
-func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readEcShardIntervals(vid needle.VolumeId, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
- if err = s.cachedLookupEcShardLocations(ctx, ecVolume); err != nil {
+ if err = s.cachedLookupEcShardLocations(ecVolume); err != nil {
return nil, false, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterAddress, err)
}
for i, interval := range intervals {
- if d, isDeleted, e := s.readOneEcShardInterval(ctx, needleId, ecVolume, interval); e != nil {
+ if d, isDeleted, e := s.readOneEcShardInterval(needleId, ecVolume, interval); e != nil {
return nil, isDeleted, e
} else {
if isDeleted {
@@ -175,12 +180,12 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, n
return
}
-func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
+func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, interval erasure_coding.Interval) (data []byte, is_deleted bool, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
if _, err = shard.ReadAt(data, actualOffset); err != nil {
- glog.V(0).Infof("read local ec shard %d.%d: %v", ecVolume.VolumeId, shardId, err)
+ glog.V(0).Infof("read local ec shard %d.%d offset %d: %v", ecVolume.VolumeId, shardId, actualOffset, err)
return
}
} else {
@@ -190,16 +195,15 @@ func (s *Store) readOneEcShardInterval(ctx context.Context, needleId types.Needl
// try reading directly
if hasShardIdLocation {
- _, is_deleted, err = s.readRemoteEcShardInterval(ctx, sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
+ _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
if err == nil {
return
}
glog.V(0).Infof("clearing ec shard %d.%d locations: %v", ecVolume.VolumeId, shardId, err)
- forgetShardId(ecVolume, shardId)
}
// try reading by recovering from other shards
- _, is_deleted, err = s.recoverOneRemoteEcShardInterval(ctx, needleId, ecVolume, shardId, data, actualOffset)
+ _, is_deleted, err = s.recoverOneRemoteEcShardInterval(needleId, ecVolume, shardId, data, actualOffset)
if err == nil {
return
}
@@ -215,7 +219,7 @@ func forgetShardId(ecVolume *erasure_coding.EcVolume, shardId erasure_coding.Sha
ecVolume.ShardLocationsLock.Unlock()
}
-func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *erasure_coding.EcVolume) (err error) {
+func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) (err error) {
shardCount := len(ecVolume.ShardLocations)
if shardCount < erasure_coding.DataShardsCount &&
@@ -234,7 +238,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
req := &master_pb.LookupEcVolumeRequest{
VolumeId: uint32(ecVolume.VolumeId),
}
- resp, err := masterClient.LookupEcVolume(ctx, req)
+ resp, err := masterClient.LookupEcVolume(context.Background(), req)
if err != nil {
return fmt.Errorf("lookup ec volume %d: %v", ecVolume.VolumeId, err)
}
@@ -258,7 +262,7 @@ func (s *Store) cachedLookupEcShardLocations(ctx context.Context, ecVolume *eras
return
}
-func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) readRemoteEcShardInterval(sourceDataNodes []string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
@@ -266,7 +270,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
for _, sourceDataNode := range sourceDataNodes {
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
- n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
+ n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
}
@@ -276,12 +280,12 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
return
}
-func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) doReadRemoteEcShardInterval(sourceDataNode string, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
- shardReadClient, err := client.VolumeEcShardRead(ctx, &volume_server_pb.VolumeEcShardReadRequest{
+ shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
@@ -298,7 +302,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
break
}
if receiveErr != nil {
- return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
+ return fmt.Errorf("receiving ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, receiveErr)
}
if resp.IsDeleted {
is_deleted = true
@@ -316,7 +320,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
return
}
-func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
+func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
@@ -344,7 +348,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId ty
go func(shardId erasure_coding.ShardId, locations []string) {
defer wg.Done()
data := make([]byte, len(buf))
- nRead, isDeleted, readErr := s.readRemoteEcShardInterval(ctx, locations, needleId, ecVolume.VolumeId, shardId, data, offset)
+ nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)
diff --git a/weed/storage/store_ec_delete.go b/weed/storage/store_ec_delete.go
index e027d2887..4a75fb20b 100644
--- a/weed/storage/store_ec_delete.go
+++ b/weed/storage/store_ec_delete.go
@@ -12,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
-func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
+func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
- count, err := s.ReadEcShardNeedle(ctx, ecVolume.VolumeId, n)
+ count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
if err != nil {
return 0, err
@@ -24,7 +24,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
return 0, fmt.Errorf("unexpected cookie %x", cookie)
}
- if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx, ecVolume, n.Id); err != nil {
+ if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
return 0, err
}
@@ -32,7 +32,7 @@ func (s *Store) DeleteEcShardNeedle(ctx context.Context, ecVolume *erasure_codin
}
-func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
_, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
@@ -43,13 +43,13 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
hasDeletionSuccess := false
- err = s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId)
+ err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
if err == nil {
hasDeletionSuccess = true
}
for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
- if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(ctx, shardId, ecVolume, needleId); parityDeletionError == nil {
+ if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
hasDeletionSuccess = true
}
}
@@ -62,7 +62,7 @@ func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ctx context.Context,
}
-func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
ecVolume.ShardLocationsLock.RLock()
sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
@@ -74,7 +74,7 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
for _, sourceDataNode := range sourceDataNodes {
glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
- err := s.doDeleteNeedleFromRemoteEcShard(ctx, sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
+ err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
if err != nil {
return err
}
@@ -85,12 +85,12 @@ func (s *Store) doDeleteNeedleFromRemoteEcShardServers(ctx context.Context, shar
}
-func (s *Store) doDeleteNeedleFromRemoteEcShard(ctx context.Context, sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
+func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
- _, err := client.VolumeEcBlobDelete(ctx, &volume_server_pb.VolumeEcBlobDeleteRequest{
+ _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
VolumeId: uint32(vid),
Collection: collection,
FileKey: uint64(needleId),
diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go
index 5dacb71bf..32666a417 100644
--- a/weed/storage/store_vacuum.go
+++ b/weed/storage/store_vacuum.go
@@ -2,6 +2,7 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -16,7 +17,11 @@ func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) {
}
func (s *Store) CompactVolume(vid needle.VolumeId, preallocate int64, compactionBytePerSecond int64) error {
if v := s.findVolume(vid); v != nil {
- return v.Compact2(preallocate) // compactionBytePerSecond
+ s := stats.NewDiskStatus(v.dir)
+ if int64(s.Free) < preallocate {
+ return fmt.Errorf("free space: %d bytes, not enough for %d bytes", s.Free, preallocate)
+ }
+ return v.Compact2(preallocate, compactionBytePerSecond)
}
return fmt.Errorf("volume id %d is not found during compact", vid)
}
diff --git a/weed/storage/super_block/replica_placement.go b/weed/storage/super_block/replica_placement.go
index fcccbba7d..a263e6669 100644
--- a/weed/storage/super_block/replica_placement.go
+++ b/weed/storage/super_block/replica_placement.go
@@ -6,9 +6,9 @@ import (
)
type ReplicaPlacement struct {
- SameRackCount int
- DiffRackCount int
- DiffDataCenterCount int
+ SameRackCount int `json:"node,omitempty"`
+ DiffRackCount int `json:"rack,omitempty"`
+ DiffDataCenterCount int `json:"dc,omitempty"`
}
func NewReplicaPlacementFromString(t string) (*ReplicaPlacement, error) {
@@ -36,6 +36,9 @@ func NewReplicaPlacementFromByte(b byte) (*ReplicaPlacement, error) {
}
func (rp *ReplicaPlacement) Byte() byte {
+ if rp == nil {
+ return 0
+ }
ret := rp.DiffDataCenterCount*100 + rp.DiffRackCount*10 + rp.SameRackCount
return byte(ret)
}
diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go
index 2ebb392db..137b97d7f 100644
--- a/weed/storage/types/needle_types.go
+++ b/weed/storage/types/needle_types.go
@@ -2,9 +2,9 @@ package types
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util"
- "math"
"strconv"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type Offset struct {
@@ -12,6 +12,15 @@ type Offset struct {
OffsetLower
}
+type Size int32
+
+func (s Size) IsDeleted() bool {
+ return s < 0 || s == TombstoneFileSize
+}
+func (s Size) IsValid() bool {
+ return s > 0 && s != TombstoneFileSize
+}
+
type OffsetLower struct {
b3 byte
b2 byte
@@ -27,7 +36,7 @@ const (
NeedleMapEntrySize = NeedleIdSize + OffsetSize + SizeSize
TimestampSize = 8 // int64 size
NeedlePaddingSize = 8
- TombstoneFileSize = math.MaxUint32
+ TombstoneFileSize = Size(-1)
CookieSize = 4
)
@@ -49,3 +58,11 @@ func ParseCookie(cookieString string) (Cookie, error) {
}
return Cookie(cookie), nil
}
+
+func BytesToSize(bytes []byte) Size {
+ return Size(util.BytesToUint32(bytes))
+}
+
+func SizeToBytes(bytes []byte, size Size) {
+ util.Uint32toBytes(bytes, uint32(size))
+}
diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go
index 9acd069d3..5348d5b36 100644
--- a/weed/storage/types/offset_4bytes.go
+++ b/weed/storage/types/offset_4bytes.go
@@ -11,8 +11,8 @@ type OffsetHigher struct {
}
const (
- OffsetSize = 4
- MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 // 32GB
+ OffsetSize = 4
+ MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 // 32GB
)
func OffsetToBytes(bytes []byte, offset Offset) {
@@ -54,7 +54,7 @@ func ToOffset(offset int64) Offset {
return Uint32ToOffset(smaller)
}
-func (offset Offset) ToAcutalOffset() (actualOffset int64) {
+func (offset Offset) ToActualOffset() (actualOffset int64) {
return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize)
}
diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go
index f57e4f6d4..b6181fc11 100644
--- a/weed/storage/types/offset_5bytes.go
+++ b/weed/storage/types/offset_5bytes.go
@@ -11,8 +11,8 @@ type OffsetHigher struct {
}
const (
- OffsetSize = 4 + 1
- MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB
+ OffsetSize = 4 + 1
+ MaxPossibleVolumeSize uint64 = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB
)
func OffsetToBytes(bytes []byte, offset Offset) {
@@ -71,7 +71,7 @@ func ToOffset(offset int64) Offset {
}
}
-func (offset Offset) ToAcutalOffset() (actualOffset int64) {
+func (offset Offset) ToActualOffset() (actualOffset int64) {
return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize)
}
diff --git a/weed/storage/types/volume_disk_type.go b/weed/storage/types/volume_disk_type.go
new file mode 100644
index 000000000..c9b87d802
--- /dev/null
+++ b/weed/storage/types/volume_disk_type.go
@@ -0,0 +1,40 @@
+package types
+
+import (
+ "strings"
+)
+
+type DiskType string
+
+const (
+ HardDriveType DiskType = ""
+ SsdType = "ssd"
+)
+
+func ToDiskType(vt string) (diskType DiskType) {
+ vt = strings.ToLower(vt)
+ diskType = HardDriveType
+ switch vt {
+ case "", "hdd":
+ diskType = HardDriveType
+ case "ssd":
+ diskType = SsdType
+ default:
+ diskType = DiskType(vt)
+ }
+ return
+}
+
+func (diskType DiskType) String() string {
+ if diskType == "" {
+ return ""
+ }
+ return string(diskType)
+}
+
+func (diskType DiskType) ReadableString() string {
+ if diskType == "" {
+ return "hdd"
+ }
+ return string(diskType)
+}
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index acede66bf..e0638d8a8 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -21,20 +21,23 @@ import (
type Volume struct {
Id needle.VolumeId
dir string
+ dirIdx string
Collection string
DataBackend backend.BackendStorageFile
nm NeedleMapper
- needleMapKind NeedleMapType
+ needleMapKind NeedleMapKind
noWriteOrDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
noWriteCanDelete bool // if readonly, either noWriteOrDelete or noWriteCanDelete
+ noWriteLock sync.RWMutex
hasRemoteFile bool // if the volume has a remote file
MemoryMapMaxSizeMb uint32
super_block.SuperBlock
dataFileAccessLock sync.RWMutex
- lastModifiedTsSeconds uint64 //unix time in seconds
- lastAppendAtNs uint64 //unix time in nanoseconds
+ asyncRequestsChan chan *needle.AsyncRequest
+ lastModifiedTsSeconds uint64 // unix time in seconds
+ lastAppendAtNs uint64 // unix time in nanoseconds
lastCompactIndexOffset uint64
lastCompactRevision uint16
@@ -42,18 +45,26 @@ type Volume struct {
isCompacting bool
volumeInfo *volume_server_pb.VolumeInfo
+ location *DiskLocation
+
+ lastIoError error
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
- v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb}
+ v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
+ asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
e = v.load(true, true, needleMapKind, preallocate)
+ v.startWorker()
return
}
+
func (v *Volume) String() string {
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
+ return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -65,10 +76,24 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) {
}
return
}
-func (v *Volume) FileName() (fileName string) {
+
+func (v *Volume) DataFileName() (fileName string) {
return VolumeFileName(v.dir, v.Collection, int(v.Id))
}
+func (v *Volume) IndexFileName() (fileName string) {
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))
+}
+
+func (v *Volume) FileName(ext string) (fileName string) {
+ switch ext {
+ case ".idx", ".cpx", ".ldb":
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id)) + ext
+ }
+ // .dat, .cpd, .vif
+ return VolumeFileName(v.dir, v.Collection, int(v.Id)) + ext
+}
+
func (v *Volume) Version() needle.Version {
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
@@ -146,6 +171,10 @@ func (v *Volume) IndexFileSize() uint64 {
return v.nm.IndexFileSize()
}
+func (v *Volume) DiskType() types.DiskType {
+ return v.location.DiskType
+}
+
// Close cleanly shuts down this volume
func (v *Volume) Close() {
v.dataFileAccessLock.Lock()
@@ -169,20 +198,20 @@ func (v *Volume) NeedToReplicate() bool {
// except when volume is empty
// or when the volume does not have a ttl
// or when volumeSizeLimit is 0 when server just starts
-func (v *Volume) expired(volumeSizeLimit uint64) bool {
+func (v *Volume) expired(contentSize uint64, volumeSizeLimit uint64) bool {
if volumeSizeLimit == 0 {
- //skip if we don't know size limit
+ // skip if we don't know size limit
return false
}
- if v.ContentSize() == 0 {
+ if contentSize <= super_block.SuperBlockSize {
return false
}
if v.Ttl == nil || v.Ttl.Minutes() == 0 {
return false
}
- glog.V(1).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTsSeconds)
+ glog.V(2).Infof("volume %d now:%v lastModified:%v", v.Id, time.Now().Unix(), v.lastModifiedTsSeconds)
livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTsSeconds)) / 60
- glog.V(1).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes)
+ glog.V(2).Infof("volume %d ttl:%v lived:%v", v.Id, v.Ttl, livedMinutes)
if int64(v.Ttl.Minutes()) < livedMinutes {
return true
}
@@ -205,27 +234,54 @@ func (v *Volume) expiredLongEnough(maxDelayMinutes uint32) bool {
return false
}
-func (v *Volume) ToVolumeInformationMessage() *master_pb.VolumeInformationMessage {
- size, _, modTime := v.FileStat()
+func (v *Volume) collectStatus() (maxFileKey types.NeedleId, datFileSize int64, modTime time.Time, fileCount, deletedCount, deletedSize uint64, ok bool) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ glog.V(3).Infof("collectStatus volume %d", v.Id)
- volumInfo := &master_pb.VolumeInformationMessage{
+ if v.nm == nil {
+ return
+ }
+
+ ok = true
+
+ maxFileKey = v.nm.MaxFileKey()
+ datFileSize, modTime, _ = v.DataBackend.GetStat()
+ fileCount = uint64(v.nm.FileCount())
+ deletedCount = uint64(v.nm.DeletedCount())
+ deletedSize = v.nm.DeletedSize()
+ fileCount = uint64(v.nm.FileCount())
+
+ return
+}
+
+func (v *Volume) ToVolumeInformationMessage() (types.NeedleId, *master_pb.VolumeInformationMessage) {
+
+ maxFileKey, volumeSize, modTime, fileCount, deletedCount, deletedSize, ok := v.collectStatus()
+
+ if !ok {
+ return 0, nil
+ }
+
+ volumeInfo := &master_pb.VolumeInformationMessage{
Id: uint32(v.Id),
- Size: size,
+ Size: uint64(volumeSize),
Collection: v.Collection,
- FileCount: v.FileCount(),
- DeleteCount: v.DeletedCount(),
- DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.noWriteOrDelete,
+ FileCount: fileCount,
+ DeleteCount: deletedCount,
+ DeletedByteCount: deletedSize,
+ ReadOnly: v.IsReadOnly(),
ReplicaPlacement: uint32(v.ReplicaPlacement.Byte()),
Version: uint32(v.Version()),
Ttl: v.Ttl.ToUint32(),
CompactRevision: uint32(v.SuperBlock.CompactionRevision),
ModifiedAtSecond: modTime.Unix(),
+ DiskType: string(v.location.DiskType),
}
- volumInfo.RemoteStorageName, volumInfo.RemoteStorageKey = v.RemoteStorageNameKey()
+ volumeInfo.RemoteStorageName, volumeInfo.RemoteStorageKey = v.RemoteStorageNameKey()
- return volumInfo
+ return maxFileKey, volumeInfo
}
func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
@@ -237,3 +293,9 @@ func (v *Volume) RemoteStorageNameKey() (storageName, storageKey string) {
}
return v.volumeInfo.GetFiles()[0].BackendName(), v.volumeInfo.GetFiles()[0].GetKey()
}
+
+func (v *Volume) IsReadOnly() bool {
+ v.noWriteLock.RLock()
+ defer v.noWriteLock.RUnlock()
+ return v.noWriteOrDelete || v.noWriteCanDelete || v.location.isDiskSpaceLow
+}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index ec29c895e..82ea12a89 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -64,8 +64,6 @@ update needle map when receiving new .dat bytes. But seems not necessary now.)
func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.DialOption) error {
- ctx := context.Background()
-
startFromOffset, _, _ := v.FileStat()
appendAtNs, err := v.findLastAppendAtNs()
if err != nil {
@@ -76,7 +74,7 @@ func (v *Volume) IncrementalBackup(volumeServer string, grpcDialOption grpc.Dial
err = operation.WithVolumeServerClient(volumeServer, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
- stream, err := client.VolumeIncrementalCopy(ctx, &volume_server_pb.VolumeIncrementalCopyRequest{
+ stream, err := client.VolumeIncrementalCopy(context.Background(), &volume_server_pb.VolumeIncrementalCopyRequest{
VolumeId: uint32(v.Id),
SinceNs: appendAtNs,
})
@@ -126,9 +124,9 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) {
}
func (v *Volume) locateLastAppendEntry() (Offset, error) {
- indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
+ indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
if e != nil {
- return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
+ return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e)
}
defer indexFile.Close()
@@ -156,13 +154,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) {
func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
- n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
+ n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset())
if err != nil {
- return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
+ return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToActualOffset(), offset.ToActualOffset()+NeedleHeaderSize, err)
}
- _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToActualOffset()+NeedleHeaderSize, bodyLength)
if err != nil {
- return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
+ return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToActualOffset(), bodyLength, err)
}
return n.AppendAtNs, nil
@@ -170,25 +168,13 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
// on server side
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
- indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
- if openErr != nil {
- err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
- return
- }
- defer indexFile.Close()
- fi, statErr := indexFile.Stat()
- if statErr != nil {
- err = fmt.Errorf("file %s stat error: %v", indexFile.Name(), statErr)
- return
- }
- fileSize := fi.Size()
+ fileSize := int64(v.IndexFileSize())
if fileSize%NeedleMapEntrySize != 0 {
- err = fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize)
+ err = fmt.Errorf("unexpected file %s.idx size: %d", v.IndexFileName(), fileSize)
return
}
- bytes := make([]byte, NeedleMapEntrySize)
entryCount := fileSize / NeedleMapEntrySize
l := int64(0)
h := entryCount
@@ -202,7 +188,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
}
// read the appendAtNs for entry m
- offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, m)
+ offset, err = v.readOffsetFromIndex(m)
if err != nil {
return
}
@@ -226,19 +212,21 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast
return Offset{}, true, nil
}
- offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l)
+ offset, err = v.readOffsetFromIndex(l)
return offset, false, err
}
// bytes is of size NeedleMapEntrySize
-func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) {
- if _, readErr := indexFile.ReadAt(bytes, m*NeedleMapEntrySize); readErr != nil && readErr != io.EOF {
- return Offset{}, readErr
+func (v *Volume) readOffsetFromIndex(m int64) (Offset, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+ if v.nm == nil {
+ return Offset{}, io.EOF
}
- _, offset, _ := idx.IdxFileEntry(bytes)
- return offset, nil
+ _, offset, _, err := v.nm.ReadIndexEntry(m)
+ return offset, err
}
// generate the volume idx
@@ -255,7 +243,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
}
func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
- if n.Size > 0 && n.Size != TombstoneFileSize {
+ if n.Size > 0 && n.Size.IsValid() {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
}
return scanner.v.nm.Delete(n.Id, ToOffset(offset))
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index a65c2a3ff..b76933083 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -2,8 +2,11 @@ package storage
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "io"
"os"
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/idx"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
@@ -11,29 +14,56 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, e error) {
+func CheckAndFixVolumeDataIntegrity(v *Volume, indexFile *os.File) (lastAppendAtNs uint64, err error) {
var indexSize int64
- if indexSize, e = verifyIndexFileIntegrity(indexFile); e != nil {
- return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), e)
+ if indexSize, err = verifyIndexFileIntegrity(indexFile); err != nil {
+ return 0, fmt.Errorf("verifyIndexFileIntegrity %s failed: %v", indexFile.Name(), err)
}
if indexSize == 0 {
return 0, nil
}
+ healthyIndexSize := indexSize
+ for i := 1; i <= 10 && indexSize >= int64(i)*NeedleMapEntrySize; i++ {
+ // check and fix last 10 entries
+ lastAppendAtNs, err = doCheckAndFixVolumeData(v, indexFile, indexSize-int64(i)*NeedleMapEntrySize)
+ if err == io.EOF {
+ healthyIndexSize = indexSize - int64(i)*NeedleMapEntrySize
+ continue
+ }
+ if err != ErrorSizeMismatch {
+ break
+ }
+ }
+ if healthyIndexSize < indexSize {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d", indexFile.Name(), indexSize, healthyIndexSize)
+ err = indexFile.Truncate(healthyIndexSize)
+ if err != nil {
+ glog.Warningf("CheckAndFixVolumeDataIntegrity truncate idx file %s from %d to %d: %v", indexFile.Name(), indexSize, healthyIndexSize, err)
+ }
+ }
+ return
+}
+
+func doCheckAndFixVolumeData(v *Volume, indexFile *os.File, indexOffset int64) (lastAppendAtNs uint64, err error) {
var lastIdxEntry []byte
- if lastIdxEntry, e = readIndexEntryAtOffset(indexFile, indexSize-NeedleMapEntrySize); e != nil {
- return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e)
+ if lastIdxEntry, err = readIndexEntryAtOffset(indexFile, indexOffset); err != nil {
+ return 0, fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), err)
}
key, offset, size := idx.IdxFileEntry(lastIdxEntry)
if offset.IsZero() {
return 0, nil
}
- if size == TombstoneFileSize {
- size = 0
- }
- if lastAppendAtNs, e = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToAcutalOffset(), key, size); e != nil {
- return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e)
+ if size < 0 {
+ // read the deletion entry
+ if lastAppendAtNs, err = verifyDeletedNeedleIntegrity(v.DataBackend, v.Version(), key); err != nil {
+ return lastAppendAtNs, fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), err)
+ }
+ } else {
+ if lastAppendAtNs, err = verifyNeedleIntegrity(v.DataBackend, v.Version(), offset.ToActualOffset(), key, size); err != nil {
+ return lastAppendAtNs, err
+ }
}
- return
+ return lastAppendAtNs, nil
}
func verifyIndexFileIntegrity(indexFile *os.File) (indexSize int64, err error) {
@@ -55,13 +85,82 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
- n := new(needle.Needle)
+func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size Size) (lastAppendAtNs uint64, err error) {
+ n, _, _, err := needle.ReadNeedleHeader(datFile, v, offset)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("read %s at %d", datFile.Name(), offset)
+ }
+ if n.Size != size {
+ return 0, ErrorSizeMismatch
+ }
+ if v == needle.Version3 {
+ bytes := make([]byte, TimestampSize)
+ _, err = datFile.ReadAt(bytes, offset+NeedleHeaderSize+int64(size)+needle.NeedleChecksumSize)
+ if err == io.EOF {
+ return 0, err
+ }
+ if err != nil {
+ return 0, fmt.Errorf("verifyNeedleIntegrity check %s entry offset %d size %d: %v", datFile.Name(), offset, size, err)
+ }
+ n.AppendAtNs = util.BytesToUint64(bytes)
+ fileTailOffset := offset + needle.GetActualSize(size, v)
+ fileSize, _, err := datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("stat file %s: %v", datFile.Name(), err)
+ }
+ if fileSize == fileTailOffset {
+ return n.AppendAtNs, nil
+ }
+ if fileSize > fileTailOffset {
+ glog.Warningf("Truncate %s from %d bytes to %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ err = datFile.Truncate(fileTailOffset)
+ if err == nil {
+ return n.AppendAtNs, nil
+ }
+ return n.AppendAtNs, fmt.Errorf("truncate file %s: %v", datFile.Name(), err)
+ }
+ glog.Warningf("data file %s has %d bytes, less than expected %d bytes!", datFile.Name(), fileSize, fileTailOffset)
+ }
if err = n.ReadData(datFile, offset, size, v); err != nil {
- return n.AppendAtNs, err
+ return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", offset, offset+int64(size), err)
+ }
+ if n.Id != key {
+ return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
+ }
+ return n.AppendAtNs, err
+}
+
+func verifyDeletedNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, key NeedleId) (lastAppendAtNs uint64, err error) {
+ n := new(needle.Needle)
+ size := n.DiskSize(v)
+ var fileSize int64
+ fileSize, _, err = datFile.GetStat()
+ if err != nil {
+ return 0, fmt.Errorf("GetStat: %v", err)
+ }
+ if err = n.ReadData(datFile, fileSize-size, Size(0), v); err != nil {
+ return n.AppendAtNs, fmt.Errorf("read data [%d,%d) : %v", fileSize-size, size, err)
}
if n.Id != key {
return n.AppendAtNs, fmt.Errorf("index key %#x does not match needle's Id %#x", key, n.Id)
}
return n.AppendAtNs, err
}
+
+func (v *Volume) checkIdxFile() error {
+ datFileSize, _, err := v.DataBackend.GetStat()
+ if err != nil {
+ return fmt.Errorf("get stat %s: %v", v.FileName(".dat"), err)
+ }
+ if datFileSize <= super_block.SuperBlockSize {
+ return nil
+ }
+ indexFileName := v.FileName(".idx")
+ if util.FileExists(indexFileName) {
+ return nil
+ }
+ return fmt.Errorf("idx file %s does not exists", indexFileName)
+}
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
deleted file mode 100644
index ffcb246a4..000000000
--- a/weed/storage/volume_create.go
+++ /dev/null
@@ -1,21 +0,0 @@
-// +build !linux,!windows
-
-package storage
-
-import (
- "os"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
-)
-
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
- file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
- if e != nil {
- return nil, e
- }
- if preallocate > 0 {
- glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
- }
- return backend.NewDiskFile(file), nil
-}
diff --git a/weed/storage/volume_info.go b/weed/storage/volume_info.go
index 313818cde..9c64c9682 100644
--- a/weed/storage/volume_info.go
+++ b/weed/storage/volume_info.go
@@ -14,6 +14,7 @@ type VolumeInfo struct {
Size uint64
ReplicaPlacement *super_block.ReplicaPlacement
Ttl *needle.TTL
+ DiskType string
Collection string
Version needle.Version
FileCount int
@@ -40,6 +41,7 @@ func NewVolumeInfo(m *master_pb.VolumeInformationMessage) (vi VolumeInfo, err er
ModifiedAtSecond: m.ModifiedAtSecond,
RemoteStorageName: m.RemoteStorageName,
RemoteStorageKey: m.RemoteStorageKey,
+ DiskType: m.DiskType,
}
rp, e := super_block.NewReplicaPlacementFromByte(byte(m.ReplicaPlacement))
if e != nil {
@@ -62,6 +64,7 @@ func NewVolumeInfoFromShort(m *master_pb.VolumeShortInformationMessage) (vi Volu
}
vi.ReplicaPlacement = rp
vi.Ttl = needle.LoadTTLFromUint32(m.Ttl)
+ vi.DiskType = m.DiskType
return vi, nil
}
@@ -90,6 +93,7 @@ func (vi VolumeInfo) ToVolumeInformationMessage() *master_pb.VolumeInformationMe
ModifiedAtSecond: vi.ModifiedAtSecond,
RemoteStorageName: vi.RemoteStorageName,
RemoteStorageKey: vi.RemoteStorageKey,
+ DiskType: vi.DiskType,
}
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index fa1f7d617..0cf603ad8 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -14,7 +14,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType) (v *Volume, err error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
@@ -22,31 +22,42 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
- fileName := v.FileName()
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
alreadyHasSuperBlock := false
- if !v.maybeLoadVolumeInfo() {
- v.SaveVolumeInfo()
- }
+ hasLoadedVolume := false
+ defer func() {
+ if !hasLoadedVolume {
+ if v.nm != nil {
+ v.nm.Close()
+ v.nm = nil
+ }
+ if v.DataBackend != nil {
+ v.DataBackend.Close()
+ v.DataBackend = nil
+ }
+ }
+ }()
+
+ hasVolumeInfoFile := v.maybeLoadVolumeInfo()
if v.HasRemoteFile() {
v.noWriteCanDelete = true
v.noWriteOrDelete = false
- glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
+ glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
- } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists {
+ } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
// open dat file
if !canRead {
- return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
+ return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
}
var dataFile *os.File
if canWrite {
- dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
} else {
- glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
- dataFile, err = os.Open(fileName + ".dat")
+ glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
+ dataFile, err = os.Open(v.FileName(".dat"))
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
@@ -56,92 +67,117 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
- v.DataBackend, err = createVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
+ v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
} else {
- return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
+ return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
}
}
if err != nil {
if !os.IsPermission(err) {
- return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err)
+ return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
} else {
- return fmt.Errorf("load data file %s.dat: %v", fileName, err)
+ return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
}
}
if alreadyHasSuperBlock {
err = v.readSuperBlock()
+ glog.V(0).Infof("readSuperBlock volume %d version %v", v.Id, v.SuperBlock.Version)
+ if v.HasRemoteFile() {
+ // maybe temporary network problem
+ glog.Errorf("readSuperBlock remote volume %d: %v", v.Id, err)
+ err = nil
+ }
} else {
if !v.SuperBlock.Initialized() {
- return fmt.Errorf("volume %s.dat not initialized", fileName)
+ return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
}
err = v.maybeWriteSuperBlock()
}
if err == nil && alsoLoadIndex {
+ // adjust for existing volumes with .idx together with .dat files
+ if v.dirIdx != v.dir {
+ if util.FileExists(v.DataFileName() + ".idx") {
+ v.dirIdx = v.dir
+ }
+ }
+ // check volume idx files
+ if err := v.checkIdxFile(); err != nil {
+ glog.Fatalf("check volume idx file %s: %v", v.FileName(".idx"), err)
+ }
var indexFile *os.File
if v.noWriteOrDelete {
- glog.V(0).Infoln("open to read file", fileName+".idx")
- if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil {
- return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err)
+ glog.V(0).Infoln("open to read file", v.FileName(".idx"))
+ if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
+ return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
}
} else {
- glog.V(1).Infoln("open to write file", fileName+".idx")
- if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
+ glog.V(1).Infoln("open to write file", v.FileName(".idx"))
+ if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
}
}
- if v.lastAppendAtNs, err = CheckVolumeDataIntegrity(v, indexFile); err != nil {
+ if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
v.noWriteOrDelete = true
glog.V(0).Infof("volumeDataIntegrityChecking failed %v", err)
}
if v.noWriteOrDelete || v.noWriteCanDelete {
- if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil {
- glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err)
+ if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
+ glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
}
} else {
switch needleMapKind {
case NeedleMapInMemory:
- glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
+ glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory")
if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
- glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err)
+ glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
}
case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
case NeedleMapLevelDbMedium:
- glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
case NeedleMapLevelDbLarge:
- glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
}
}
+ if !hasVolumeInfoFile {
+ v.volumeInfo.Version = uint32(v.SuperBlock.Version)
+ v.SaveVolumeInfo()
+ }
+
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
+ if err == nil {
+ hasLoadedVolume = true
+ }
+
return err
}
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
new file mode 100644
index 000000000..f689eeec0
--- /dev/null
+++ b/weed/storage/volume_read.go
@@ -0,0 +1,131 @@
+package storage
+
+import (
+ "fmt"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) readNeedle(n *needle.Needle, readOption *ReadOption) (int, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return -1, ErrorNotFound
+ }
+ readSize := nv.Size
+ if readSize.IsDeleted() {
+ if readOption != nil && readOption.ReadDeleted && readSize != TombstoneFileSize {
+ glog.V(3).Infof("reading deleted %s", n.String())
+ readSize = -readSize
+ } else {
+ return -1, ErrorDeleted
+ }
+ }
+ if readSize == 0 {
+ return 0, nil
+ }
+ err := n.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), readSize, v.Version())
+ if err == needle.ErrorSizeMismatch && OffsetSize == 4 {
+ err = n.ReadData(v.DataBackend, nv.Offset.ToActualOffset()+int64(MaxPossibleVolumeSize), readSize, v.Version())
+ }
+ v.checkReadWriteError(err)
+ if err != nil {
+ return 0, err
+ }
+ bytesRead := len(n.Data)
+ if !n.HasTtl() {
+ return bytesRead, nil
+ }
+ ttlMinutes := n.Ttl.Minutes()
+ if ttlMinutes == 0 {
+ return bytesRead, nil
+ }
+ if !n.HasLastModifiedDate() {
+ return bytesRead, nil
+ }
+ if time.Now().Before(time.Unix(0, int64(n.AppendAtNs)).Add(time.Duration(ttlMinutes) * time.Minute)) {
+ return bytesRead, nil
+ }
+ return -1, ErrorNotFound
+}
+
+// read fills in Needle content by looking up n.Id from NeedleMapper
+func (v *Volume) ReadNeedleBlob(offset int64, size Size) ([]byte, error) {
+ v.dataFileAccessLock.RLock()
+ defer v.dataFileAccessLock.RUnlock()
+
+ return needle.ReadNeedleBlob(v.DataBackend, offset, size, v.Version())
+}
+
+type VolumeFileScanner interface {
+ VisitSuperBlock(super_block.SuperBlock) error
+ ReadNeedleBody() bool
+ VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
+}
+
+func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
+ needleMapKind NeedleMapKind,
+ volumeFileScanner VolumeFileScanner) (err error) {
+ var v *Volume
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
+ return fmt.Errorf("failed to load volume %d: %v", id, err)
+ }
+ if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
+ return fmt.Errorf("failed to process volume %d super block: %v", id, err)
+ }
+ defer v.Close()
+
+ version := v.Version()
+
+ offset := int64(v.SuperBlock.BlockSize())
+
+ return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
+}
+
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+ n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
+ if e != nil {
+ if e == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
+ }
+ for n != nil {
+ var needleBody []byte
+ if volumeFileScanner.ReadNeedleBody() {
+ // println("needle", n.Id.String(), "offset", offset, "size", n.Size, "rest", rest)
+ if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
+ glog.V(0).Infof("cannot read needle head [%d, %d) body [%d, %d) body length %d: %v", offset, offset+NeedleHeaderSize, offset+NeedleHeaderSize, offset+NeedleHeaderSize+rest, rest, err)
+ // err = fmt.Errorf("cannot read needle body: %v", err)
+ // return
+ }
+ }
+ err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ glog.V(0).Infof("visit needle error: %v", err)
+ return fmt.Errorf("visit needle error: %v", err)
+ }
+ offset += NeedleHeaderSize + rest
+ glog.V(4).Infof("==> new entry offset %d", offset)
+ if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
+ }
+ glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
+ }
+ return nil
+}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
deleted file mode 100644
index 0aa3f794b..000000000
--- a/weed/storage/volume_read_write.go
+++ /dev/null
@@ -1,237 +0,0 @@
-package storage
-
-import (
- "bytes"
- "errors"
- "fmt"
- "io"
- "os"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/super_block"
- . "github.com/chrislusf/seaweedfs/weed/storage/types"
-)
-
-var ErrorNotFound = errors.New("not found")
-
-// isFileUnchanged checks whether this needle to write is same as last one.
-// It requires serialized access in the same volume.
-func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
- if v.Ttl.String() != "" {
- return false
- }
-
- nv, ok := v.nm.Get(n.Id)
- if ok && !nv.Offset.IsZero() && nv.Size != TombstoneFileSize {
- oldNeedle := new(needle.Needle)
- err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
- if err != nil {
- glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToAcutalOffset(), nv.Size, err)
- return false
- }
- if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
- n.DataSize = oldNeedle.DataSize
- return true
- }
- }
- return false
-}
-
-// Destroy removes everything related to this volume
-func (v *Volume) Destroy() (err error) {
- if v.isCompacting {
- err = fmt.Errorf("volume %d is compacting", v.Id)
- return
- }
- storageName, storageKey := v.RemoteStorageNameKey()
- if v.HasRemoteFile() && storageName != "" && storageKey != "" {
- if backendStorage, found := backend.BackendStorages[storageName]; found {
- backendStorage.DeleteFile(storageKey)
- }
- }
- v.Close()
- os.Remove(v.FileName() + ".dat")
- os.Remove(v.FileName() + ".idx")
- os.Remove(v.FileName() + ".vif")
- os.Remove(v.FileName() + ".sdx")
- os.Remove(v.FileName() + ".cpd")
- os.Remove(v.FileName() + ".cpx")
- os.RemoveAll(v.FileName() + ".ldb")
- return
-}
-
-func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
- glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- if v.isFileUnchanged(n) {
- size = n.DataSize
- isUnchanged = true
- return
- }
-
- if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
- n.SetHasTtl()
- n.Ttl = v.Ttl
- }
-
- // check whether existing needle cookie matches
- nv, ok := v.nm.Get(n.Id)
- if ok {
- existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToAcutalOffset())
- if existingNeedleReadErr != nil {
- err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
- return
- }
- if existingNeedle.Cookie != n.Cookie {
- glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
- err = fmt.Errorf("mismatching cookie %x", n.Cookie)
- return
- }
- }
-
- // append to dat file
- n.AppendAtNs = uint64(time.Now().UnixNano())
- if offset, size, _, err = n.Append(v.DataBackend, v.Version()); err != nil {
- return
- }
- v.lastAppendAtNs = n.AppendAtNs
-
- // add to needle map
- if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
- if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
- glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
- }
- }
- if v.lastModifiedTsSeconds < n.LastModified {
- v.lastModifiedTsSeconds = n.LastModified
- }
- return
-}
-
-func (v *Volume) deleteNeedle(n *needle.Needle) (uint32, error) {
- glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
- v.dataFileAccessLock.Lock()
- defer v.dataFileAccessLock.Unlock()
- nv, ok := v.nm.Get(n.Id)
- //fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
- if ok && nv.Size != TombstoneFileSize {
- size := nv.Size
- n.Data = nil
- n.AppendAtNs = uint64(time.Now().UnixNano())
- offset, _, _, err := n.Append(v.DataBackend, v.Version())
- if err != nil {
- return size, err
- }
- v.lastAppendAtNs = n.AppendAtNs
- if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
- return size, err
- }
- return size, err
- }
- return 0, nil
-}
-
-// read fills in Needle content by looking up n.Id from NeedleMapper
-func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
- v.dataFileAccessLock.RLock()
- defer v.dataFileAccessLock.RUnlock()
-
- nv, ok := v.nm.Get(n.Id)
- if !ok || nv.Offset.IsZero() {
- return -1, ErrorNotFound
- }
- if nv.Size == TombstoneFileSize {
- return -1, errors.New("already deleted")
- }
- if nv.Size == 0 {
- return 0, nil
- }
- err := n.ReadData(v.DataBackend, nv.Offset.ToAcutalOffset(), nv.Size, v.Version())
- if err != nil {
- return 0, err
- }
- bytesRead := len(n.Data)
- if !n.HasTtl() {
- return bytesRead, nil
- }
- ttlMinutes := n.Ttl.Minutes()
- if ttlMinutes == 0 {
- return bytesRead, nil
- }
- if !n.HasLastModifiedDate() {
- return bytesRead, nil
- }
- if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) {
- return bytesRead, nil
- }
- return -1, ErrorNotFound
-}
-
-type VolumeFileScanner interface {
- VisitSuperBlock(super_block.SuperBlock) error
- ReadNeedleBody() bool
- VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
-}
-
-func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
- needleMapKind NeedleMapType,
- volumeFileScanner VolumeFileScanner) (err error) {
- var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
- return fmt.Errorf("failed to load volume %d: %v", id, err)
- }
- if v.volumeInfo.Version == 0 {
- if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
- return fmt.Errorf("failed to process volume %d super block: %v", id, err)
- }
- }
- defer v.Close()
-
- version := v.Version()
-
- offset := int64(v.SuperBlock.BlockSize())
-
- return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
-}
-
-func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
- if e != nil {
- if e == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read %s at offset %d: %v", datBackend.Name(), offset, e)
- }
- for n != nil {
- var needleBody []byte
- if volumeFileScanner.ReadNeedleBody() {
- if needleBody, err = n.ReadNeedleBody(datBackend, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle body: %v", err)
- //err = fmt.Errorf("cannot read needle body: %v", err)
- //return
- }
- }
- err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
- if err == io.EOF {
- return nil
- }
- if err != nil {
- glog.V(0).Infof("visit needle error: %v", err)
- return fmt.Errorf("visit needle error: %v", err)
- }
- offset += NeedleHeaderSize + rest
- glog.V(4).Infof("==> new entry offset %d", offset)
- if n, nh, rest, err = needle.ReadNeedleHeader(datBackend, version, offset); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
- }
- glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
- }
- return nil
-}
diff --git a/weed/storage/volume_stream_write.go b/weed/storage/volume_stream_write.go
new file mode 100644
index 000000000..d229bdf20
--- /dev/null
+++ b/weed/storage/volume_stream_write.go
@@ -0,0 +1,104 @@
+package storage
+
+import (
+ "bufio"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "io"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (v *Volume) StreamWrite(n *needle.Needle, data io.Reader, dataSize uint32) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ df, ok := v.DataBackend.(*backend.DiskFile)
+ if !ok {
+ return fmt.Errorf("unexpected volume backend")
+ }
+ offset, _, _ := v.DataBackend.GetStat()
+
+ header := make([]byte, NeedleHeaderSize+TimestampSize) // adding timestamp to reuse it and avoid extra allocation
+ CookieToBytes(header[0:CookieSize], n.Cookie)
+ NeedleIdToBytes(header[CookieSize:CookieSize+NeedleIdSize], n.Id)
+ n.Size = 4 + Size(dataSize) + 1
+ SizeToBytes(header[CookieSize+NeedleIdSize:CookieSize+NeedleIdSize+SizeSize], n.Size)
+
+ n.DataSize = dataSize
+
+ // needle header
+ df.Write(header[0:NeedleHeaderSize])
+
+ // data size and data
+ util.Uint32toBytes(header[0:4], n.DataSize)
+ df.Write(header[0:4])
+ // write and calculate CRC
+ crcWriter := needle.NewCRCwriter(df)
+ io.Copy(crcWriter, io.LimitReader(data, int64(dataSize)))
+
+ // flags
+ util.Uint8toBytes(header[0:1], n.Flags)
+ df.Write(header[0:1])
+
+ // data checksum
+ util.Uint32toBytes(header[0:needle.NeedleChecksumSize], crcWriter.Sum())
+ // write timestamp, padding
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ util.Uint64toBytes(header[needle.NeedleChecksumSize:needle.NeedleChecksumSize+TimestampSize], n.AppendAtNs)
+ padding := needle.PaddingLength(n.Size, needle.Version3)
+ df.Write(header[0 : needle.NeedleChecksumSize+TimestampSize+padding])
+
+ // add to needle map
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ return
+}
+
+func (v *Volume) StreamRead(n *needle.Needle, writer io.Writer) (err error) {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ nv, ok := v.nm.Get(n.Id)
+ if !ok || nv.Offset.IsZero() {
+ return ErrorNotFound
+ }
+
+ sr := &StreamReader{
+ readerAt: v.DataBackend,
+ offset: nv.Offset.ToActualOffset(),
+ }
+ bufReader := bufio.NewReader(sr)
+ bufReader.Discard(NeedleHeaderSize)
+ sizeBuf := make([]byte, 4)
+ bufReader.Read(sizeBuf)
+ if _, err = writer.Write(sizeBuf); err != nil {
+ return err
+ }
+ dataSize := util.BytesToUint32(sizeBuf)
+
+ _, err = io.Copy(writer, io.LimitReader(bufReader, int64(dataSize)))
+
+ return
+}
+
+type StreamReader struct {
+ offset int64
+ readerAt io.ReaderAt
+}
+
+func (sr *StreamReader) Read(p []byte) (n int, err error) {
+ n, err = sr.readerAt.ReadAt(p, sr.offset)
+ if err != nil {
+ return
+ }
+ sr.offset += int64(n)
+ return
+}
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 61c09d85a..20223ac1b 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -1,6 +1,7 @@
package storage
import (
+ "fmt"
"os"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -25,8 +26,10 @@ func (v *Volume) maybeWriteSuperBlock() error {
if dataFile, e = os.Create(v.DataBackend.Name()); e == nil {
v.DataBackend = backend.NewDiskFile(dataFile)
if _, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0); e == nil {
+ v.noWriteLock.Lock()
v.noWriteOrDelete = false
v.noWriteCanDelete = false
+ v.noWriteLock.Unlock()
}
}
}
@@ -36,5 +39,12 @@ func (v *Volume) maybeWriteSuperBlock() error {
func (v *Volume) readSuperBlock() (err error) {
v.SuperBlock, err = super_block.ReadSuperBlock(v.DataBackend)
+ if v.volumeInfo != nil && v.volumeInfo.Replication != "" {
+ if replication, err := super_block.NewReplicaPlacementFromString(v.volumeInfo.Replication); err != nil {
+ return fmt.Errorf("Error parse volume %d replication %s : %v", v.Id, v.volumeInfo.Replication, err)
+ } else {
+ v.SuperBlock.ReplicaPlacement = replication
+ }
+ }
return err
}
diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go
index 99071285f..23160906b 100644
--- a/weed/storage/volume_tier.go
+++ b/weed/storage/volume_tier.go
@@ -6,6 +6,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
_ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
)
func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
@@ -14,12 +15,21 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
- v.volumeInfo, found = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
+ var err error
+ v.volumeInfo, v.hasRemoteFile, found, err = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
- if found {
+ if v.volumeInfo.Version == 0 {
+ v.volumeInfo.Version = uint32(needle.CurrentVersion)
+ }
+
+ if v.hasRemoteFile {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
v.volumeInfo.Files[0].BackendName(), v.volumeInfo.Files[0].Key)
- v.hasRemoteFile = true
+ }
+
+ if err != nil {
+ glog.Warningf("load volume %d.vif file: %v", v.Id, err)
+ return
}
return
@@ -44,7 +54,7 @@ func (v *Volume) LoadRemoteFile() error {
func (v *Volume) SaveVolumeInfo() error {
- tierFileName := v.FileName() + ".vif"
+ tierFileName := v.FileName(".vif")
return pb.SaveVolumeInfo(tierFileName, v.volumeInfo)
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 434b5989d..be84f8a13 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -3,6 +3,7 @@ package storage
import (
"fmt"
"os"
+ "runtime"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -48,15 +49,20 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.isCompacting = false
}()
- filePath := v.FileName()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
+ if err := v.DataBackend.Sync(); err != nil {
+ glog.V(0).Infof("compact fail to sync volume %d", v.Id)
+ }
+ if err := v.nm.Sync(); err != nil {
+ glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
+ }
+ return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
}
// compact a volume based on deletions in .idx files
-func (v *Volume) Compact2(preallocate int64) error {
+func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) error {
if v.MemoryMapMaxSizeMb != 0 { //it makes no sense to compact in memory
return nil
@@ -68,11 +74,16 @@ func (v *Volume) Compact2(preallocate int64) error {
v.isCompacting = false
}()
- filePath := v.FileName()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
- return v.copyDataBasedOnIndexFile(filePath+".cpd", filePath+".cpx", preallocate)
+ if err := v.DataBackend.Sync(); err != nil {
+ glog.V(0).Infof("compact2 fail to sync volume dat %d: %v", v.Id, err)
+ }
+ if err := v.nm.Sync(); err != nil {
+ glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
+ }
+ return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
}
func (v *Volume) CommitCompact() error {
@@ -91,38 +102,49 @@ func (v *Volume) CommitCompact() error {
glog.V(3).Infof("Got volume %d committing lock...", v.Id)
v.nm.Close()
- if err := v.DataBackend.Close(); err != nil {
- glog.V(0).Infof("fail to close volume %d", v.Id)
+ if v.DataBackend != nil {
+ if err := v.DataBackend.Close(); err != nil {
+ glog.V(0).Infof("fail to close volume %d", v.Id)
+ }
}
v.DataBackend = nil
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
var e error
- if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
+ if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
- e = os.Remove(v.FileName() + ".cpd")
+ e = os.Remove(v.FileName(".cpd"))
if e != nil {
return e
}
- e = os.Remove(v.FileName() + ".cpx")
+ e = os.Remove(v.FileName(".cpx"))
if e != nil {
return e
}
} else {
+ if runtime.GOOS == "windows" {
+ e = os.RemoveAll(v.FileName(".dat"))
+ if e != nil {
+ return e
+ }
+ e = os.RemoveAll(v.FileName(".idx"))
+ if e != nil {
+ return e
+ }
+ }
var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
+ if e = os.Rename(v.FileName(".cpd"), v.FileName(".dat")); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName(".cpd"), e)
}
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
+ if e = os.Rename(v.FileName(".cpx"), v.FileName(".idx")); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName(".cpx"), e)
}
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- os.RemoveAll(v.FileName() + ".ldb")
- os.RemoveAll(v.FileName() + ".bdb")
+ os.RemoveAll(v.FileName(".ldb"))
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
@@ -134,8 +156,8 @@ func (v *Volume) CommitCompact() error {
func (v *Volume) cleanupCompact() error {
glog.V(0).Infof("Cleaning up volume %d vacuuming...", v.Id)
- e1 := os.Remove(v.FileName() + ".cpd")
- e2 := os.Remove(v.FileName() + ".cpx")
+ e1 := os.Remove(v.FileName(".cpd"))
+ e2 := os.Remove(v.FileName(".cpx"))
if e1 != nil {
return e1
}
@@ -158,9 +180,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
var indexSize int64
oldIdxFile, err := os.Open(oldIdxFileName)
+ if err != nil {
+ return fmt.Errorf("makeupDiff open %s failed: %v", oldIdxFileName, err)
+ }
defer oldIdxFile.Close()
oldDatFile, err := os.Open(oldDatFileName)
+ if err != nil {
+ return fmt.Errorf("makeupDiff open %s failed: %v", oldDatFileName, err)
+ }
oldDatBackend := backend.NewDiskFile(oldDatFile)
defer oldDatBackend.Close()
@@ -183,7 +211,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type keyField struct {
offset Offset
- size uint32
+ size Size
}
incrementedHasUpdatedIndexEntry := make(map[NeedleId]keyField)
@@ -250,15 +278,15 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
}
//updated needle
- if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize {
+ if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size.IsValid() {
//even the needle cache in memory is hit, the need_bytes is correct
- glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size)
+ glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size)
var needleBytes []byte
- needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version())
+ needleBytes, err = needle.ReadNeedleBlob(oldDatBackend, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, v.Version())
if err != nil {
- return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err)
+ return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToActualOffset(), increIdxEntry.size, err)
}
- dst.Write(needleBytes)
+ dstDatBackend.Write(needleBytes)
util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize))
} else { //deleted needle
//fakeDelNeedle 's default Data field is nil
@@ -311,7 +339,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
}
nv, ok := scanner.v.nm.Get(n.Id)
glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize {
+ if ok && nv.Offset.ToActualOffset() == offset && nv.Size > 0 && nv.Size.IsValid() {
if err := scanner.nm.Set(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil {
return fmt.Errorf("cannot put needle: %s", err)
}
@@ -330,12 +358,13 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
var (
dst backend.BackendStorageFile
)
- if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {
+ if dst, err = backend.CreateVolumeFile(dstName, preallocate, 0); err != nil {
return
}
defer dst.Close()
nm := needle_map.NewMemDb()
+ defer nm.Close()
scanner := &VolumeFileScanner4Vacuum{
v: v,
@@ -353,64 +382,70 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, prealloca
return
}
-func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string, preallocate int64) (err error) {
+func copyDataBasedOnIndexFile(srcDatName, srcIdxName, dstDatName, datIdxName string, sb super_block.SuperBlock, version needle.Version, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dstDatBackend backend.BackendStorageFile
- oldIndexFile *os.File
+ srcDatBackend, dstDatBackend backend.BackendStorageFile
+ dataFile *os.File
)
- if dstDatBackend, err = createVolumeFile(dstName, preallocate, 0); err != nil {
+ if dstDatBackend, err = backend.CreateVolumeFile(dstDatName, preallocate, 0); err != nil {
return
}
defer dstDatBackend.Close()
- if oldIndexFile, err = os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644); err != nil {
+ oldNm := needle_map.NewMemDb()
+ defer oldNm.Close()
+ newNm := needle_map.NewMemDb()
+ defer newNm.Close()
+ if err = oldNm.LoadFromIdx(srcIdxName); err != nil {
return
}
- defer oldIndexFile.Close()
-
- nm := needle_map.NewMemDb()
+ if dataFile, err = os.Open(srcDatName); err != nil {
+ return err
+ }
+ srcDatBackend = backend.NewDiskFile(dataFile)
+ defer srcDatBackend.Close()
now := uint64(time.Now().Unix())
- v.SuperBlock.CompactionRevision++
- dstDatBackend.WriteAt(v.SuperBlock.Bytes(), 0)
- newOffset := int64(v.SuperBlock.BlockSize())
+ sb.CompactionRevision++
+ dstDatBackend.WriteAt(sb.Bytes(), 0)
+ newOffset := int64(sb.BlockSize())
- idx2.WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error {
- if offset.IsZero() || size == TombstoneFileSize {
- return nil
- }
+ writeThrottler := util.NewWriteThrottler(compactionBytePerSecond)
+
+ oldNm.AscendingVisit(func(value needle_map.NeedleValue) error {
+
+ offset, size := value.Offset, value.Size
- nv, ok := v.nm.Get(key)
- if !ok {
+ if offset.IsZero() || size.IsDeleted() {
return nil
}
n := new(needle.Needle)
- err := n.ReadData(v.DataBackend, offset.ToAcutalOffset(), size, v.Version())
+ err := n.ReadData(srcDatBackend, offset.ToActualOffset(), size, version)
if err != nil {
return nil
}
- if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) {
+ if n.HasTtl() && now >= n.LastModified+uint64(sb.Ttl.Minutes()*60) {
return nil
}
- glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv)
- if nv.Offset == offset && nv.Size > 0 {
- if err = nm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
- return fmt.Errorf("cannot put needle: %s", err)
- }
- if _, _, _, err = n.Append(dstDatBackend, v.Version()); err != nil {
- return fmt.Errorf("cannot append needle: %s", err)
- }
- newOffset += n.DiskSize(v.Version())
- glog.V(3).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+ if err = newNm.Set(n.Id, ToOffset(newOffset), n.Size); err != nil {
+ return fmt.Errorf("cannot put needle: %s", err)
}
+ if _, _, _, err = n.Append(dstDatBackend, sb.Version); err != nil {
+ return fmt.Errorf("cannot append needle: %s", err)
+ }
+ delta := n.DiskSize(version)
+ newOffset += delta
+ writeThrottler.MaybeSlowdown(delta)
+ glog.V(4).Infoln("saving key", n.Id, "volume offset", offset, "=>", newOffset, "data_size", n.Size)
+
return nil
})
- nm.SaveToIdx(idxName)
+ newNm.SaveToIdx(datIdxName)
return
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 95f43d6ec..cd5a4f430 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -69,7 +69,7 @@ func TestCompaction(t *testing.T) {
}
defer os.RemoveAll(dir) // clean up
- v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -84,7 +84,7 @@ func TestCompaction(t *testing.T) {
}
startTime := time.Now()
- v.Compact2(0)
+ v.Compact2(0, 0)
speed := float64(v.ContentSize()) / time.Now().Sub(startTime).Seconds()
t.Logf("compaction speed: %.2f bytes/s", speed)
@@ -96,7 +96,7 @@ func TestCompaction(t *testing.T) {
v.Close()
- v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
+ v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
if err != nil {
t.Fatalf("volume reloading: %v", err)
}
@@ -113,11 +113,11 @@ func TestCompaction(t *testing.T) {
}
n := newEmptyNeedle(uint64(i))
- size, err := v.readNeedle(n)
+ size, err := v.readNeedle(n, nil)
if err != nil {
t.Fatalf("read file %d: %v", i, err)
}
- if infos[i-1].size != uint32(size) {
+ if infos[i-1].size != types.Size(size) {
t.Fatalf("read file %d size mismatch expected %d found %d", i, infos[i-1].size, size)
}
if infos[i-1].crc != n.Checksum {
@@ -129,7 +129,7 @@ func TestCompaction(t *testing.T) {
}
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
n := newRandomNeedle(uint64(i))
- _, size, _, err := v.writeNeedle(n)
+ _, size, _, err := v.writeNeedle2(n, false)
if err != nil {
t.Fatalf("write file %d: %v", i, err)
}
@@ -141,7 +141,7 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
if rand.Float64() < 0.03 {
toBeDeleted := rand.Intn(i) + 1
oldNeedle := newEmptyNeedle(uint64(toBeDeleted))
- v.deleteNeedle(oldNeedle)
+ v.deleteNeedle2(oldNeedle)
// println("deleted file", toBeDeleted)
infos[toBeDeleted-1] = &needleInfo{
size: 0,
@@ -151,7 +151,7 @@ func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
}
type needleInfo struct {
- size uint32
+ size types.Size
crc needle.CRC
}
diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go
new file mode 100644
index 000000000..a286c5dd5
--- /dev/null
+++ b/weed/storage/volume_write.go
@@ -0,0 +1,327 @@
+package storage
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ . "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+var ErrorNotFound = errors.New("not found")
+var ErrorDeleted = errors.New("already deleted")
+var ErrorSizeMismatch = errors.New("size mismatch")
+
+func (v *Volume) checkReadWriteError(err error) {
+ if err == nil {
+ if v.lastIoError != nil {
+ v.lastIoError = nil
+ }
+ return
+ }
+ if err.Error() == "input/output error" {
+ v.lastIoError = err
+ }
+}
+
+// isFileUnchanged checks whether this needle to write is same as last one.
+// It requires serialized access in the same volume.
+func (v *Volume) isFileUnchanged(n *needle.Needle) bool {
+ if v.Ttl.String() != "" {
+ return false
+ }
+
+ nv, ok := v.nm.Get(n.Id)
+ if ok && !nv.Offset.IsZero() && nv.Size.IsValid() {
+ oldNeedle := new(needle.Needle)
+ err := oldNeedle.ReadData(v.DataBackend, nv.Offset.ToActualOffset(), nv.Size, v.Version())
+ if err != nil {
+ glog.V(0).Infof("Failed to check updated file at offset %d size %d: %v", nv.Offset.ToActualOffset(), nv.Size, err)
+ return false
+ }
+ if oldNeedle.Cookie == n.Cookie && oldNeedle.Checksum == n.Checksum && bytes.Equal(oldNeedle.Data, n.Data) {
+ n.DataSize = oldNeedle.DataSize
+ return true
+ }
+ }
+ return false
+}
+
+// Destroy removes everything related to this volume
+func (v *Volume) Destroy() (err error) {
+ if v.isCompacting {
+ err = fmt.Errorf("volume %d is compacting", v.Id)
+ return
+ }
+ close(v.asyncRequestsChan)
+ storageName, storageKey := v.RemoteStorageNameKey()
+ if v.HasRemoteFile() && storageName != "" && storageKey != "" {
+ if backendStorage, found := backend.BackendStorages[storageName]; found {
+ backendStorage.DeleteFile(storageKey)
+ }
+ }
+ v.Close()
+ removeVolumeFiles(v.DataFileName())
+ removeVolumeFiles(v.IndexFileName())
+ return
+}
+
+func removeVolumeFiles(filename string) {
+ // basic
+ os.Remove(filename + ".dat")
+ os.Remove(filename + ".idx")
+ os.Remove(filename + ".vif")
+ // sorted index file
+ os.Remove(filename + ".sdx")
+ // compaction
+ os.Remove(filename + ".cpd")
+ os.Remove(filename + ".cpx")
+ // level db indx file
+ os.RemoveAll(filename + ".ldb")
+ // marker for damaged or incomplete volume
+ os.Remove(filename + ".note")
+}
+
+func (v *Volume) asyncRequestAppend(request *needle.AsyncRequest) {
+ v.asyncRequestsChan <- request
+}
+
+func (v *Volume) syncWrite(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
+ // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
+ actualSize := needle.GetActualSize(Size(len(n.Data)), v.Version())
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) {
+ err = fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ return
+ }
+
+ return v.doWriteRequest(n)
+}
+
+func (v *Volume) writeNeedle2(n *needle.Needle, fsync bool) (offset uint64, size Size, isUnchanged bool, err error) {
+ // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
+ if n.Ttl == needle.EMPTY_TTL && v.Ttl != needle.EMPTY_TTL {
+ n.SetHasTtl()
+ n.Ttl = v.Ttl
+ }
+
+ if !fsync {
+ return v.syncWrite(n)
+ } else {
+ asyncRequest := needle.NewAsyncRequest(n, true)
+ // using len(n.Data) here instead of n.Size before n.Size is populated in n.Append()
+ asyncRequest.ActualSize = needle.GetActualSize(Size(len(n.Data)), v.Version())
+
+ v.asyncRequestAppend(asyncRequest)
+ offset, _, isUnchanged, err = asyncRequest.WaitComplete()
+
+ return
+ }
+}
+
+func (v *Volume) doWriteRequest(n *needle.Needle) (offset uint64, size Size, isUnchanged bool, err error) {
+ // glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
+ if v.isFileUnchanged(n) {
+ size = Size(n.DataSize)
+ isUnchanged = true
+ return
+ }
+
+ // check whether existing needle cookie matches
+ nv, ok := v.nm.Get(n.Id)
+ if ok {
+ existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.DataBackend, v.Version(), nv.Offset.ToActualOffset())
+ if existingNeedleReadErr != nil {
+ err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
+ return
+ }
+ if existingNeedle.Cookie != n.Cookie {
+ glog.V(0).Infof("write cookie mismatch: existing %s, new %s",
+ needle.NewFileIdFromNeedle(v.Id, existingNeedle), needle.NewFileIdFromNeedle(v.Id, n))
+ err = fmt.Errorf("mismatching cookie %x", n.Cookie)
+ return
+ }
+ }
+
+ // append to dat file
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ offset, size, _, err = n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
+ if err != nil {
+ return
+ }
+ v.lastAppendAtNs = n.AppendAtNs
+
+ // add to needle map
+ if !ok || uint64(nv.Offset.ToActualOffset()) < offset {
+ if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
+ glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
+ }
+ }
+ if v.lastModifiedTsSeconds < n.LastModified {
+ v.lastModifiedTsSeconds = n.LastModified
+ }
+ return
+}
+
+func (v *Volume) syncDelete(n *needle.Needle) (Size, error) {
+ // glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
+ actualSize := needle.GetActualSize(0, v.Version())
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(actualSize) {
+ err := fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ return 0, err
+ }
+
+ return v.doDeleteRequest(n)
+}
+
+func (v *Volume) deleteNeedle2(n *needle.Needle) (Size, error) {
+ // todo: delete info is always appended no fsync, it may need fsync in future
+ fsync := false
+
+ if !fsync {
+ return v.syncDelete(n)
+ } else {
+ asyncRequest := needle.NewAsyncRequest(n, false)
+ asyncRequest.ActualSize = needle.GetActualSize(0, v.Version())
+
+ v.asyncRequestAppend(asyncRequest)
+ _, size, _, err := asyncRequest.WaitComplete()
+
+ return Size(size), err
+ }
+}
+
+func (v *Volume) doDeleteRequest(n *needle.Needle) (Size, error) {
+ glog.V(4).Infof("delete needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
+ nv, ok := v.nm.Get(n.Id)
+ // fmt.Println("key", n.Id, "volume offset", nv.Offset, "data_size", n.Size, "cached size", nv.Size)
+ if ok && nv.Size.IsValid() {
+ size := nv.Size
+ n.Data = nil
+ n.AppendAtNs = uint64(time.Now().UnixNano())
+ offset, _, _, err := n.Append(v.DataBackend, v.Version())
+ v.checkReadWriteError(err)
+ if err != nil {
+ return size, err
+ }
+ v.lastAppendAtNs = n.AppendAtNs
+ if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil {
+ return size, err
+ }
+ return size, err
+ }
+ return 0, nil
+}
+
+func (v *Volume) startWorker() {
+ go func() {
+ chanClosed := false
+ for {
+ // chan closed. go thread will exit
+ if chanClosed {
+ break
+ }
+ currentRequests := make([]*needle.AsyncRequest, 0, 128)
+ currentBytesToWrite := int64(0)
+ for {
+ request, ok := <-v.asyncRequestsChan
+ // volume may be closed
+ if !ok {
+ chanClosed = true
+ break
+ }
+ if MaxPossibleVolumeSize < v.ContentSize()+uint64(currentBytesToWrite+request.ActualSize) {
+ request.Complete(0, 0, false,
+ fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.ContentSize()))
+ break
+ }
+ currentRequests = append(currentRequests, request)
+ currentBytesToWrite += request.ActualSize
+ // submit at most 4M bytes or 128 requests at one time to decrease request delay.
+ // it also need to break if there is no data in channel to avoid io hang.
+ if currentBytesToWrite >= 4*1024*1024 || len(currentRequests) >= 128 || len(v.asyncRequestsChan) == 0 {
+ break
+ }
+ }
+ if len(currentRequests) == 0 {
+ continue
+ }
+ v.dataFileAccessLock.Lock()
+ end, _, e := v.DataBackend.GetStat()
+ if e != nil {
+ for i := 0; i < len(currentRequests); i++ {
+ currentRequests[i].Complete(0, 0, false,
+ fmt.Errorf("cannot read current volume position: %v", e))
+ }
+ v.dataFileAccessLock.Unlock()
+ continue
+ }
+
+ for i := 0; i < len(currentRequests); i++ {
+ if currentRequests[i].IsWriteRequest {
+ offset, size, isUnchanged, err := v.doWriteRequest(currentRequests[i].N)
+ currentRequests[i].UpdateResult(offset, uint64(size), isUnchanged, err)
+ } else {
+ size, err := v.doDeleteRequest(currentRequests[i].N)
+ currentRequests[i].UpdateResult(0, uint64(size), false, err)
+ }
+ }
+
+ // if sync error, data is not reliable, we should mark the completed request as fail and rollback
+ if err := v.DataBackend.Sync(); err != nil {
+ // todo: this may generate dirty data or cause data inconsistent, may be weed need to panic?
+ if te := v.DataBackend.Truncate(end); te != nil {
+ glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", v.DataBackend.Name(), end, te)
+ }
+ for i := 0; i < len(currentRequests); i++ {
+ if currentRequests[i].IsSucceed() {
+ currentRequests[i].UpdateResult(0, 0, false, err)
+ }
+ }
+ }
+
+ for i := 0; i < len(currentRequests); i++ {
+ currentRequests[i].Submit()
+ }
+ v.dataFileAccessLock.Unlock()
+ }
+ }()
+}
+
+func (v *Volume) WriteNeedleBlob(needleId NeedleId, needleBlob []byte, size Size) error {
+
+ v.dataFileAccessLock.Lock()
+ defer v.dataFileAccessLock.Unlock()
+
+ if MaxPossibleVolumeSize < v.nm.ContentSize()+uint64(len(needleBlob)) {
+ return fmt.Errorf("volume size limit %d exceeded! current size is %d", MaxPossibleVolumeSize, v.nm.ContentSize())
+ }
+
+ appendAtNs := uint64(time.Now().UnixNano())
+ offset, err := needle.WriteNeedleBlob(v.DataBackend, needleBlob, size, appendAtNs, v.Version())
+
+ v.checkReadWriteError(err)
+ if err != nil {
+ return err
+ }
+ v.lastAppendAtNs = appendAtNs
+
+ // add to needle map
+ if err = v.nm.Put(needleId, ToOffset(int64(offset)), size); err != nil {
+ glog.V(4).Infof("failed to put in needle map %d: %v", needleId, err)
+ }
+
+ return err
+}