aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--unmaintained/fix_dat/fix_dat.go2
-rw-r--r--unmaintained/remove_duplicate_fids/remove_duplicate_fids.go2
-rw-r--r--weed/command/master.go3
-rw-r--r--weed/command/scaffold.go15
-rw-r--r--weed/command/scaffold_test.go42
-rw-r--r--weed/server/common.go2
-rw-r--r--weed/server/volume_grpc_copy_incremental.go2
-rw-r--r--weed/storage/backend/backend.go43
-rw-r--r--weed/storage/backend/disk_file.go2
-rw-r--r--weed/storage/backend/memory_map/memory_map_backend.go2
-rw-r--r--weed/storage/backend/s3_backend/s3_backend.go124
-rw-r--r--weed/storage/needle/needle_read_write.go12
-rw-r--r--weed/storage/volume.go2
-rw-r--r--weed/storage/volume_checking.go2
-rw-r--r--weed/storage/volume_create.go2
-rw-r--r--weed/storage/volume_create_linux.go2
-rw-r--r--weed/storage/volume_create_windows.go2
-rw-r--r--weed/storage/volume_read_write.go2
-rw-r--r--weed/storage/volume_super_block.go2
-rw-r--r--weed/storage/volume_vacuum.go6
20 files changed, 185 insertions, 86 deletions
diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go
index e88a48c96..bee30b2bd 100644
--- a/unmaintained/fix_dat/fix_dat.go
+++ b/unmaintained/fix_dat/fix_dat.go
@@ -73,7 +73,7 @@ func main() {
}
-func iterateEntries(datBackend backend.DataStorageBackend, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) {
+func iterateEntries(datBackend backend.BackendStorageFile, idxFile *os.File, visitNeedle func(n *needle.Needle, offset int64)) {
// start to read index file
var readerOffset int64
bytes := make([]byte, 16)
diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
index a162d1757..dfbb82049 100644
--- a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
+++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
@@ -29,7 +29,7 @@ type VolumeFileScanner4SeeDat struct {
dir string
hashes map[string]bool
dat *os.File
- datBackend backend.DataStorageBackend
+ datBackend backend.BackendStorageFile
}
func (scanner *VolumeFileScanner4SeeDat) VisitSuperBlock(superBlock storage.SuperBlock) error {
diff --git a/weed/command/master.go b/weed/command/master.go
index 3d33f4f7a..8d0a3289c 100644
--- a/weed/command/master.go
+++ b/weed/command/master.go
@@ -12,6 +12,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/gorilla/mux"
"github.com/spf13/viper"
@@ -101,6 +102,8 @@ func runMaster(cmd *Command, args []string) bool {
func startMaster(masterOption MasterOptions, masterWhiteList []string) {
+ backend.LoadConfiguration(viper.GetViper())
+
myMasterAddress, peers := checkPeers(*masterOption.ip, *masterOption.port, *masterOption.peers)
r := mux.NewRouter()
diff --git a/weed/command/scaffold.go b/weed/command/scaffold.go
index d0bdb3c60..f615033c7 100644
--- a/weed/command/scaffold.go
+++ b/weed/command/scaffold.go
@@ -357,13 +357,14 @@ type = "memory" # Choose [memory|etcd] type for storing the file id sequence
sequencer_etcd_urls = "http://127.0.0.1:2379"
-[storage.backend.s3]
-enabled = true
-aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
-region = "us-east-2"
-bucket = "your_bucket_name" # an existing bucket
-directory = "/" # destination directory
+[storage.backend]
+ [storage.backend.s3.default]
+ enabled = true
+ aws_access_key_id = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ aws_secret_access_key = "" # if empty, loads from the shared credentials file (~/.aws/credentials).
+ region = "us-east-2"
+ bucket = "your_bucket_name" # an existing bucket
+ directory = "/" # destination directory
`
)
diff --git a/weed/command/scaffold_test.go b/weed/command/scaffold_test.go
new file mode 100644
index 000000000..3d69fa317
--- /dev/null
+++ b/weed/command/scaffold_test.go
@@ -0,0 +1,42 @@
+package command
+
+import (
+ "bytes"
+ "fmt"
+ "testing"
+)
+
+func TestReadingTomlConfiguration(t *testing.T) {
+
+ viper.SetConfigType("toml")
+
+ // any approach to require this configuration into your program.
+ var tomlExample = []byte(`
+[database]
+server = "192.168.1.1"
+ports = [ 8001, 8001, 8002 ]
+connection_max = 5000
+enabled = true
+
+[servers]
+
+ # You can indent as you please. Tabs or spaces. TOML don't care.
+ [servers.alpha]
+ ip = "10.0.0.1"
+ dc = "eqdc10"
+
+ [servers.beta]
+ ip = "10.0.0.2"
+ dc = "eqdc10"
+
+`)
+
+ viper.ReadConfig(bytes.NewBuffer(tomlExample))
+
+ fmt.Printf("database is %v\n", viper.Get("database"))
+ fmt.Printf("servers is %v\n", viper.GetStringMap("servers"))
+
+ alpha := viper.Sub("servers.alpha")
+
+ fmt.Printf("alpha ip is %v\n", alpha.GetString("ip"))
+}
diff --git a/weed/server/common.go b/weed/server/common.go
index d50c283f2..e3e16b927 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -14,6 +14,8 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"google.golang.org/grpc"
+ _ "github.com/chrislusf/seaweedfs/weed/storage/backend/s3_backend"
+
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
diff --git a/weed/server/volume_grpc_copy_incremental.go b/weed/server/volume_grpc_copy_incremental.go
index 6c5bb8a62..6d6c3daa3 100644
--- a/weed/server/volume_grpc_copy_incremental.go
+++ b/weed/server/volume_grpc_copy_incremental.go
@@ -47,7 +47,7 @@ func (vs *VolumeServer) VolumeSyncStatus(ctx context.Context, req *volume_server
}
-func sendFileContent(datBackend backend.DataStorageBackend, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
+func sendFileContent(datBackend backend.BackendStorageFile, buf []byte, startOffset, stopOffset int64, stream volume_server_pb.VolumeServer_VolumeIncrementalCopyServer) error {
var blockSizeLimit = int64(len(buf))
for i := int64(0); i < stopOffset-startOffset; i += blockSizeLimit {
n, readErr := datBackend.ReadAt(buf, startOffset+i)
diff --git a/weed/storage/backend/backend.go b/weed/storage/backend/backend.go
index 4d72abc87..36631e113 100644
--- a/weed/storage/backend/backend.go
+++ b/weed/storage/backend/backend.go
@@ -4,9 +4,13 @@ import (
"io"
"os"
"time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/spf13/viper"
)
-type DataStorageBackend interface {
+type BackendStorageFile interface {
io.ReaderAt
io.WriterAt
Truncate(off int64) error
@@ -16,6 +20,41 @@ type DataStorageBackend interface {
Instantiate(src *os.File) error
}
+type BackendStorage interface {
+ Name() string
+ NewStorageFile(key string) BackendStorageFile
+}
+
+type StorageType string
+type BackendStorageFactory interface {
+ StorageType() StorageType
+ BuildStorage(configuration util.Configuration, id string) (BackendStorage, error)
+}
+
var (
- StorageBackends []DataStorageBackend
+ BackendStorageFactories = make(map[StorageType]BackendStorageFactory)
+ BackendStorages = make(map[string]BackendStorage)
)
+
+func LoadConfiguration(config *viper.Viper) {
+
+ StorageBackendPrefix := "storage.backend"
+
+ backendSub := config.Sub(StorageBackendPrefix)
+
+ for backendTypeName, _ := range config.GetStringMap(StorageBackendPrefix) {
+ backendStorageFactory, found := BackendStorageFactories[StorageType(backendTypeName)]
+ if !found {
+ glog.Fatalf("backend storage type %s not found", backendTypeName)
+ }
+ backendTypeSub := backendSub.Sub(backendTypeName)
+ for backendStorageId, _ := range backendSub.GetStringMap(backendTypeName) {
+ backendStorage, buildErr := backendStorageFactory.BuildStorage(backendTypeSub.Sub(backendStorageId), backendStorageId)
+ if buildErr != nil {
+ glog.Fatalf("fail to create backend storage %s.%s", backendTypeName, backendStorageId)
+ }
+ BackendStorages[backendTypeName+"."+backendStorageId] = backendStorage
+ }
+ }
+
+}
diff --git a/weed/storage/backend/disk_file.go b/weed/storage/backend/disk_file.go
index 4fc3ed0c4..4929f0573 100644
--- a/weed/storage/backend/disk_file.go
+++ b/weed/storage/backend/disk_file.go
@@ -6,7 +6,7 @@ import (
)
var (
- _ DataStorageBackend = &DiskFile{}
+ _ BackendStorageFile = &DiskFile{}
)
type DiskFile struct {
diff --git a/weed/storage/backend/memory_map/memory_map_backend.go b/weed/storage/backend/memory_map/memory_map_backend.go
index c57252683..f1d723c3b 100644
--- a/weed/storage/backend/memory_map/memory_map_backend.go
+++ b/weed/storage/backend/memory_map/memory_map_backend.go
@@ -8,7 +8,7 @@ import (
)
var (
- _ backend.DataStorageBackend = &MemoryMappedFile{}
+ _ backend.BackendStorageFile = &MemoryMappedFile{}
)
type MemoryMappedFile struct {
diff --git a/weed/storage/backend/s3_backend/s3_backend.go b/weed/storage/backend/s3_backend/s3_backend.go
index 69360806f..980e9e9d7 100644
--- a/weed/storage/backend/s3_backend/s3_backend.go
+++ b/weed/storage/backend/s3_backend/s3_backend.go
@@ -10,36 +10,77 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- _ backend.DataStorageBackend = &S3Backend{}
-)
-
func init() {
- backend.StorageBackends = append(backend.StorageBackends, &S3Backend{})
+ backend.BackendStorageFactories["s3"] = &S3BackendFactory{}
+}
+
+type S3BackendFactory struct {
}
-type S3Backend struct {
+func (factory *S3BackendFactory) StorageType() backend.StorageType {
+ return backend.StorageType("s3")
+}
+func (factory *S3BackendFactory) BuildStorage(configuration util.Configuration, id string) (backend.BackendStorage, error) {
+ return newS3BackendStorage(configuration, id)
+}
+
+type S3BackendStorage struct {
+ id string
conn s3iface.S3API
region string
bucket string
- vid needle.VolumeId
- key string
}
-func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
+func newS3BackendStorage(configuration util.Configuration, id string) (s *S3BackendStorage, err error) {
+ s = &S3BackendStorage{}
+ s.id = id
+ s.conn, err = createSession(
+ configuration.GetString("aws_access_key_id"),
+ configuration.GetString("aws_secret_access_key"),
+ configuration.GetString("region"))
+ s.region = configuration.GetString("region")
+ s.bucket = configuration.GetString("bucket")
+
+ glog.V(0).Infof("created s3 backend storage %s for region %s bucket %s", s.Name(), s.region, s.bucket)
+ return
+}
+
+func (s *S3BackendStorage) Name() string {
+ return "s3." + s.id
+}
+
+func (s *S3BackendStorage) NewStorageFile(key string) backend.BackendStorageFile {
+ if strings.HasPrefix(key, "/") {
+ key = key[1:]
+ }
+
+ f := &S3BackendStorageFile{
+ backendStorage: s,
+ key: key,
+ }
+
+ return f
+}
+
+type S3BackendStorageFile struct {
+ backendStorage *S3BackendStorage
+ key string
+}
+
+func (s3backendStorageFile S3BackendStorageFile) ReadAt(p []byte, off int64) (n int, err error) {
bytesRange := fmt.Sprintf("bytes=%d-%d", off, off+int64(len(p))-1)
- getObjectOutput, getObjectErr := s3backend.conn.GetObject(&s3.GetObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+ getObjectOutput, getObjectErr := s3backendStorageFile.backendStorage.conn.GetObject(&s3.GetObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
Range: &bytesRange,
})
if getObjectErr != nil {
- return 0, fmt.Errorf("bucket %s GetObject %s: %v", s3backend.bucket, s3backend.key, getObjectErr)
+ return 0, fmt.Errorf("bucket %s GetObject %s: %v",
+ s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, getObjectErr)
}
defer getObjectOutput.Body.Close()
@@ -47,27 +88,28 @@ func (s3backend S3Backend) ReadAt(p []byte, off int64) (n int, err error) {
}
-func (s3backend S3Backend) WriteAt(p []byte, off int64) (n int, err error) {
+func (s3backendStorageFile S3BackendStorageFile) WriteAt(p []byte, off int64) (n int, err error) {
panic("implement me")
}
-func (s3backend S3Backend) Truncate(off int64) error {
+func (s3backendStorageFile S3BackendStorageFile) Truncate(off int64) error {
panic("implement me")
}
-func (s3backend S3Backend) Close() error {
+func (s3backendStorageFile S3BackendStorageFile) Close() error {
return nil
}
-func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err error) {
+func (s3backendStorageFile S3BackendStorageFile) GetStat() (datSize int64, modTime time.Time, err error) {
- headObjectOutput, headObjectErr := s3backend.conn.HeadObject(&s3.HeadObjectInput{
- Bucket: &s3backend.bucket,
- Key: &s3backend.key,
+ headObjectOutput, headObjectErr := s3backendStorageFile.backendStorage.conn.HeadObject(&s3.HeadObjectInput{
+ Bucket: &s3backendStorageFile.backendStorage.bucket,
+ Key: &s3backendStorageFile.key,
})
if headObjectErr != nil {
- return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v", s3backend.bucket, s3backend.key, headObjectErr)
+ return 0, time.Now(), fmt.Errorf("bucket %s HeadObject %s: %v",
+ s3backendStorageFile.backendStorage.bucket, s3backendStorageFile.key, headObjectErr)
}
datSize = int64(*headObjectOutput.ContentLength)
@@ -76,44 +118,14 @@ func (s3backend S3Backend) GetStat() (datSize int64, modTime time.Time, err erro
return
}
-func (s3backend S3Backend) String() string {
- return fmt.Sprintf("%s/%s", s3backend.bucket, s3backend.key)
+func (s3backendStorageFile S3BackendStorageFile) String() string {
+ return s3backendStorageFile.key
}
-func (s3backend *S3Backend) GetName() string {
+func (s3backendStorageFile *S3BackendStorageFile) GetName() string {
return "s3"
}
-func (s3backend S3Backend) Instantiate(src *os.File) error {
+func (s3backendStorageFile S3BackendStorageFile) Instantiate(src *os.File) error {
panic("implement me")
}
-
-func (s3backend *S3Backend) Initialize(configuration util.Configuration, prefix string, vid needle.VolumeId) error {
- glog.V(0).Infof("storage.backend.s3.region: %v", configuration.GetString("region"))
- glog.V(0).Infof("storage.backend.s3.bucket: %v", configuration.GetString("bucket"))
- glog.V(0).Infof("storage.backend.s3.directory: %v", configuration.GetString("directory"))
-
- return s3backend.initialize(
- configuration.GetString("aws_access_key_id"),
- configuration.GetString("aws_secret_access_key"),
- configuration.GetString("region"),
- configuration.GetString("bucket"),
- prefix,
- vid,
- )
-}
-
-func (s3backend *S3Backend) initialize(awsAccessKeyId, awsSecretAccessKey, region, bucket string,
- prefix string, vid needle.VolumeId) (err error) {
- s3backend.region = region
- s3backend.bucket = bucket
- s3backend.conn, err = createSession(awsAccessKeyId, awsSecretAccessKey, region)
-
- s3backend.vid = vid
- s3backend.key = fmt.Sprintf("%s_%d.dat", prefix, vid)
- if strings.HasPrefix(s3backend.key, "/") {
- s3backend.key = s3backend.key[1:]
- }
-
- return err
-}
diff --git a/weed/storage/needle/needle_read_write.go b/weed/storage/needle/needle_read_write.go
index 8e5d18b1a..60301fa36 100644
--- a/weed/storage/needle/needle_read_write.go
+++ b/weed/storage/needle/needle_read_write.go
@@ -125,10 +125,10 @@ func (n *Needle) prepareWriteBuffer(version Version) ([]byte, uint32, int64, err
return writeBytes, 0, 0, fmt.Errorf("Unsupported Version! (%d)", version)
}
-func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset uint64, size uint32, actualSize int64, err error) {
+func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size uint32, actualSize int64, err error) {
if end, _, e := w.GetStat(); e == nil {
- defer func(w backend.DataStorageBackend, off int64) {
+ defer func(w backend.BackendStorageFile, off int64) {
if err != nil {
if te := w.Truncate(end); te != nil {
glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.String(), end, te)
@@ -150,7 +150,7 @@ func (n *Needle) Append(w backend.DataStorageBackend, version Version) (offset u
return offset, size, actualSize, err
}
-func ReadNeedleBlob(r backend.DataStorageBackend, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
+func ReadNeedleBlob(r backend.BackendStorageFile, offset int64, size uint32, version Version) (dataSlice []byte, err error) {
dataSize := GetActualSize(size, version)
dataSlice = make([]byte, int(dataSize))
@@ -191,7 +191,7 @@ func (n *Needle) ReadBytes(bytes []byte, offset int64, size uint32, version Vers
}
// ReadData hydrates the needle from the file, with only n.Id is set.
-func (n *Needle) ReadData(r backend.DataStorageBackend, offset int64, size uint32, version Version) (err error) {
+func (n *Needle) ReadData(r backend.BackendStorageFile, offset int64, size uint32, version Version) (err error) {
bytes, err := ReadNeedleBlob(r, offset, size, version)
if err != nil {
return err
@@ -266,7 +266,7 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) (err error) {
return nil
}
-func ReadNeedleHeader(r backend.DataStorageBackend, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
+func ReadNeedleHeader(r backend.BackendStorageFile, version Version, offset int64) (n *Needle, bytes []byte, bodyLength int64, err error) {
n = new(Needle)
if version == Version1 || version == Version2 || version == Version3 {
bytes = make([]byte, NeedleHeaderSize)
@@ -301,7 +301,7 @@ func NeedleBodyLength(needleSize uint32, version Version) int64 {
//n should be a needle already read the header
//the input stream will read until next file entry
-func (n *Needle) ReadNeedleBody(r backend.DataStorageBackend, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
+func (n *Needle) ReadNeedleBody(r backend.BackendStorageFile, version Version, offset int64, bodyLength int64) (bytes []byte, err error) {
if bodyLength <= 0 {
return nil, nil
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index e85696eab..00d76a4b3 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -21,7 +21,7 @@ type Volume struct {
Id needle.VolumeId
dir string
Collection string
- DataBackend backend.DataStorageBackend
+ DataBackend backend.BackendStorageFile
nm NeedleMapper
needleMapKind NeedleMapType
readOnly bool
diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go
index 61b59e9f7..a65c2a3ff 100644
--- a/weed/storage/volume_checking.go
+++ b/weed/storage/volume_checking.go
@@ -55,7 +55,7 @@ func readIndexEntryAtOffset(indexFile *os.File, offset int64) (bytes []byte, err
return
}
-func verifyNeedleIntegrity(datFile backend.DataStorageBackend, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
+func verifyNeedleIntegrity(datFile backend.BackendStorageFile, v needle.Version, offset int64, key NeedleId, size uint32) (lastAppendAtNs uint64, err error) {
n := new(needle.Needle)
if err = n.ReadData(datFile, offset, size, v); err != nil {
return n.AppendAtNs, err
diff --git a/weed/storage/volume_create.go b/weed/storage/volume_create.go
index b27a62990..ee41c50a9 100644
--- a/weed/storage/volume_create.go
+++ b/weed/storage/volume_create.go
@@ -9,7 +9,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
diff --git a/weed/storage/volume_create_linux.go b/weed/storage/volume_create_linux.go
index e3305d991..5fafbe924 100644
--- a/weed/storage/volume_create_linux.go
+++ b/weed/storage/volume_create_linux.go
@@ -10,7 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
file, e := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0644)
if preallocate != 0 {
syscall.Fallocate(int(file.Fd()), 1, 0, preallocate)
diff --git a/weed/storage/volume_create_windows.go b/weed/storage/volume_create_windows.go
index 81536810b..9e5d8f87d 100644
--- a/weed/storage/volume_create_windows.go
+++ b/weed/storage/volume_create_windows.go
@@ -11,7 +11,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/backend/memory_map/os_overloads"
)
-func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.DataStorageBackend, error) {
+func createVolumeFile(fileName string, preallocate int64, memoryMapSizeMB uint32) (backend.BackendStorageFile, error) {
if preallocate > 0 {
glog.V(0).Infof("Preallocated disk space for %s is not supported", fileName)
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 242325755..c1758c18b 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -195,7 +195,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
return ScanVolumeFileFrom(version, v.DataBackend, offset, volumeFileScanner)
}
-func ScanVolumeFileFrom(version needle.Version, datBackend backend.DataStorageBackend, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
+func ScanVolumeFileFrom(version needle.Version, datBackend backend.BackendStorageFile, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
n, nh, rest, e := needle.ReadNeedleHeader(datBackend, version, offset)
if e != nil {
if e == io.EOF {
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index bce5af465..911cd5455 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -104,7 +104,7 @@ func (v *Volume) readSuperBlock() (err error) {
}
// ReadSuperBlock reads from data file and load it into volume's super block
-func ReadSuperBlock(datBackend backend.DataStorageBackend) (superBlock SuperBlock, err error) {
+func ReadSuperBlock(datBackend backend.BackendStorageFile) (superBlock SuperBlock, err error) {
header := make([]byte, _SuperBlockSize)
if _, e := datBackend.ReadAt(header, 0); e != nil {
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index e90746b54..caa1777e4 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -132,7 +132,7 @@ func (v *Volume) cleanupCompact() error {
return nil
}
-func fetchCompactRevisionFromDatFile(datBackend backend.DataStorageBackend) (compactRevision uint16, err error) {
+func fetchCompactRevisionFromDatFile(datBackend backend.BackendStorageFile) (compactRevision uint16, err error) {
superBlock, err := ReadSuperBlock(datBackend)
if err != nil {
return 0, err
@@ -270,7 +270,7 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI
type VolumeFileScanner4Vacuum struct {
version needle.Version
v *Volume
- dstBackend backend.DataStorageBackend
+ dstBackend backend.BackendStorageFile
nm *NeedleMap
newOffset int64
now uint64
@@ -312,7 +312,7 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset in
func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string, preallocate int64, compactionBytePerSecond int64) (err error) {
var (
- dst backend.DataStorageBackend
+ dst backend.BackendStorageFile
idx *os.File
)
if dst, err = createVolumeFile(dstName, preallocate, 0); err != nil {