aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/backup.go4
-rw-r--r--weed/command/compact.go3
-rw-r--r--weed/command/server.go2
-rw-r--r--weed/command/volume.go2
-rw-r--r--weed/server/volume_grpc_copy.go31
-rw-r--r--weed/server/volume_grpc_erasure_coding.go69
-rw-r--r--weed/server/volume_grpc_tier_download.go4
-rw-r--r--weed/server/volume_grpc_tier_upload.go2
-rw-r--r--weed/shell/command_volume_fsck.go4
-rw-r--r--weed/storage/disk_location.go10
-rw-r--r--weed/storage/disk_location_ec.go9
-rw-r--r--weed/storage/erasure_coding/ec_decoder.go8
-rw-r--r--weed/storage/erasure_coding/ec_volume.go41
-rw-r--r--weed/storage/needle_map_sorted_file.go10
-rw-r--r--weed/storage/store.go2
-rw-r--r--weed/storage/volume.go22
-rw-r--r--weed/storage/volume_backup.go12
-rw-r--r--weed/storage/volume_loading.go65
-rw-r--r--weed/storage/volume_read_write.go3
-rw-r--r--weed/storage/volume_tier.go4
-rw-r--r--weed/storage/volume_vacuum.go30
-rw-r--r--weed/storage/volume_vacuum_test.go4
22 files changed, 196 insertions, 145 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index 950cbf68e..4c37c2763 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -112,7 +112,7 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
- v, err := storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
+ v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
@@ -137,7 +137,7 @@ func runBackup(cmd *Command, args []string) bool {
// remove the old data
v.Destroy()
// recreate an empty volume
- v, err = storage.NewVolume(util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
+ v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 6117cf9f3..92e25f474 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -41,8 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid,
- storage.NeedleMapInMemory, nil, nil, preallocate, 0)
+ v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/weed/command/server.go b/weed/command/server.go
index edf626fe7..0c6731eb2 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -105,7 +105,7 @@ func init() {
serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address")
serverOptions.v.preStopSeconds = cmdServer.Flag.Int("volume.preStopSeconds", 10, "number of seconds between stop send heartbeats and stop volume server")
serverOptions.v.pprof = cmdServer.Flag.Bool("volume.pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
- serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "WIP directory to store .idx files")
+ serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files")
s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port")
s3Options.domainName = cmdServer.Flag.String("s3.domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
diff --git a/weed/command/volume.go b/weed/command/volume.go
index fec783514..9597e843a 100644
--- a/weed/command/volume.go
+++ b/weed/command/volume.go
@@ -84,7 +84,7 @@ func init() {
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.pprof = cmdVolume.Flag.Bool("pprof", false, "enable pprof http handlers. precludes --memprofile and --cpuprofile")
v.metricsHttpPort = cmdVolume.Flag.Int("metricsPort", 0, "Prometheus metrics listen port")
- v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "WIP directory to store .idx files")
+ v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files")
}
var cmdVolume = &Command{
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 2aecb140f..cfa3710a8 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -48,7 +48,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// send .dat file
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
- var volumeFileName, idxFileName, datFileName string
+ var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
@@ -59,24 +59,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return fmt.Errorf("read volume file status failed, %v", err)
}
- volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
- ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
+ ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
// println("source:", volFileInfoResp.String())
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
return err
}
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
return err
}
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
- os.Remove(volumeFileName + ".note")
+ os.Remove(dataBaseFileName + ".note")
return nil
})
@@ -84,18 +85,18 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
if err != nil {
return nil, err
}
- if volumeFileName == "" {
+ if dataBaseFileName == "" {
return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
}
- idxFileName = volumeFileName + ".idx"
- datFileName = volumeFileName + ".dat"
+ idxFileName = indexBaseFileName + ".idx"
+ datFileName = dataBaseFileName + ".dat"
defer func() {
- if err != nil && volumeFileName != "" {
+ if err != nil && dataBaseFileName != "" {
os.Remove(idxFileName)
os.Remove(datFileName)
- os.Remove(volumeFileName + ".vif")
+ os.Remove(dataBaseFileName + ".vif")
}
}()
@@ -223,7 +224,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
- fileName = v.FileName() + req.Ext
+ fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
for _, location := range vs.store.Locations {
@@ -231,6 +232,10 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if util.FileExists(tName) {
fileName = tName
}
+ tName = util.Join(location.IdxDirectory, baseFileName)
+ if util.FileExists(tName) {
+ fileName = tName
+ }
}
if fileName == "" {
if req.IgnoreSourceFileNotFound {
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 55e0261c8..186c6eafc 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -8,7 +8,6 @@ import (
"math"
"os"
"path"
- "path/filepath"
"strings"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -44,7 +43,7 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
if v == nil {
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
+ baseFileName := v.DataFileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
@@ -56,8 +55,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .ecx file
- if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
- return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteSortedFileFromIdx(v.IndexFileName(), ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", v.IndexFileName(), err)
}
// write .vif files
@@ -78,17 +77,18 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
var rebuiltShardIds []uint32
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFileName+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, baseFileName+".ecx")) {
// write .ec00 ~ .ec13 files
- baseFileName = path.Join(location.Directory, baseFileName)
- if generatedShardIds, err := erasure_coding.RebuildEcFiles(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcFiles %s: %v", baseFileName, err)
+ dataBaseFileName := path.Join(location.Directory, baseFileName)
+ if generatedShardIds, err := erasure_coding.RebuildEcFiles(dataBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcFiles %s: %v", dataBaseFileName, err)
} else {
rebuiltShardIds = generatedShardIds
}
- if err := erasure_coding.RebuildEcxFile(baseFileName); err != nil {
- return nil, fmt.Errorf("RebuildEcxFile %s: %v", baseFileName, err)
+ indexBaseFileName := path.Join(location.IdxDirectory, baseFileName)
+ if err := erasure_coding.RebuildEcxFile(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("RebuildEcxFile %s: %v", dataBaseFileName, err)
}
break
@@ -110,13 +110,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
return nil, fmt.Errorf("no space left")
}
- baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
+ indexBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
@@ -124,7 +125,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcxFile {
// copy ecx file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
return err
}
return nil
@@ -132,14 +133,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcjFile {
// copy ecj file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil {
+ if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
}
@@ -157,17 +158,19 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// the shard should not be mounted before calling this.
func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) {
- baseFilename := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
+ bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
glog.V(0).Infof("ec volume %d shard delete %v", req.VolumeId, req.ShardIds)
found := false
+ var indexBaseFilename, dataBaseFilename string
for _, location := range vs.store.Locations {
- if util.FileExists(path.Join(location.Directory, baseFilename+".ecx")) {
+ if util.FileExists(path.Join(location.IdxDirectory, bName+".ecx")) {
found = true
- baseFilename = path.Join(location.Directory, baseFilename)
+ indexBaseFilename = path.Join(location.IdxDirectory, bName)
+ dataBaseFilename = path.Join(location.Directory, bName)
for _, shardId := range req.ShardIds {
- os.Remove(baseFilename + erasure_coding.ToExt(int(shardId)))
+ os.Remove(dataBaseFilename + erasure_coding.ToExt(int(shardId)))
}
break
}
@@ -182,12 +185,18 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
hasIdxFile := false
existingShardCount := 0
- bName := filepath.Base(baseFilename)
for _, location := range vs.store.Locations {
fileInfos, err := ioutil.ReadDir(location.Directory)
if err != nil {
continue
}
+ if location.IdxDirectory != location.Directory {
+ idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
+ if err != nil {
+ continue
+ }
+ fileInfos = append(fileInfos, idxFileInfos...)
+ }
for _, fileInfo := range fileInfos {
if fileInfo.Name() == bName+".ecx" || fileInfo.Name() == bName+".ecj" {
hasEcxFile = true
@@ -204,14 +213,14 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
}
if hasEcxFile && existingShardCount == 0 {
- if err := os.Remove(baseFilename + ".ecx"); err != nil {
+ if err := os.Remove(indexBaseFilename + ".ecx"); err != nil {
return nil, err
}
- os.Remove(baseFilename + ".ecj")
+ os.Remove(indexBaseFilename + ".ecj")
}
if !hasIdxFile {
// .vif is used for ec volumes and normal volumes
- os.Remove(baseFilename + ".vif")
+ os.Remove(dataBaseFilename + ".vif")
}
return &volume_server_pb.VolumeEcShardsDeleteResponse{}, nil
@@ -365,26 +374,26 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
}
- baseFileName := v.FileName()
if v.Collection != req.Collection {
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ dataBaseFileName, indexBaseFileName := v.DataBaseFileName(), v.IndexBaseFileName()
// calculate .dat file size
- datFileSize, err := erasure_coding.FindDatFileSize(baseFileName)
+ datFileSize, err := erasure_coding.FindDatFileSize(dataBaseFileName, indexBaseFileName)
if err != nil {
- return nil, fmt.Errorf("FindDatFileSize %s: %v", baseFileName, err)
+ return nil, fmt.Errorf("FindDatFileSize %s: %v", dataBaseFileName, err)
}
// write .dat file from .ec00 ~ .ec09 files
- if err := erasure_coding.WriteDatFile(baseFileName, datFileSize); err != nil {
- return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteDatFile(dataBaseFileName, datFileSize); err != nil {
+ return nil, fmt.Errorf("WriteEcFiles %s: %v", dataBaseFileName, err)
}
// write .idx file from .ecx and .ecj files
- if err := erasure_coding.WriteIdxFileFromEcIndex(baseFileName); err != nil {
- return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", baseFileName, err)
+ if err := erasure_coding.WriteIdxFileFromEcIndex(indexBaseFileName); err != nil {
+ return nil, fmt.Errorf("WriteIdxFileFromEcIndex %s: %v", v.IndexBaseFileName(), err)
}
return &volume_server_pb.VolumeEcShardsToVolumeResponse{}, nil
diff --git a/weed/server/volume_grpc_tier_download.go b/weed/server/volume_grpc_tier_download.go
index 7b3982e40..73d8ae7cb 100644
--- a/weed/server/volume_grpc_tier_download.go
+++ b/weed/server/volume_grpc_tier_download.go
@@ -58,9 +58,9 @@ func (vs *VolumeServer) VolumeTierMoveDatFromRemote(req *volume_server_pb.Volume
})
}
// copy the data file
- _, err := backendStorage.DownloadFile(v.FileName()+".dat", storageKey, fn)
+ _, err := backendStorage.DownloadFile(v.FileName(".dat"), storageKey, fn)
if err != nil {
- return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName()+".dat", err)
+ return fmt.Errorf("backend %s copy file %s: %v", storageName, v.FileName(".dat"), err)
}
if req.KeepRemoteDatFile {
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
index c9694df59..e51de5f1d 100644
--- a/weed/server/volume_grpc_tier_upload.go
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -93,7 +93,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
}
if !req.KeepLocalDatFile {
- os.Remove(v.FileName() + ".dat")
+ os.Remove(v.FileName(".dat"))
}
return nil
diff --git a/weed/shell/command_volume_fsck.go b/weed/shell/command_volume_fsck.go
index 4031cd237..75b0f28da 100644
--- a/weed/shell/command_volume_fsck.go
+++ b/weed/shell/command_volume_fsck.go
@@ -157,12 +157,12 @@ func (c *commandVolumeFsck) collectOneVolumeFileIds(tempFolder string, volumeId
IgnoreSourceFileNotFound: false,
})
if err != nil {
- return fmt.Errorf("failed to start copying volume %d.idx: %v", volumeId, err)
+ return fmt.Errorf("failed to start copying volume %d%s: %v", volumeId, ext, err)
}
err = writeToFile(copyFileClient, getVolumeFileIdFile(tempFolder, volumeId))
if err != nil {
- return fmt.Errorf("failed to copy %d.idx from %s: %v", volumeId, vinfo.server, err)
+ return fmt.Errorf("failed to copy %d%s from %s: %v", volumeId, ext, vinfo.server, err)
}
return nil
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 7b68a86fc..2d4d120af 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -53,8 +53,8 @@ func NewDiskLocation(dir string, maxVolumeCount int, minFreeSpacePercent float32
}
func volumeIdFromFileName(filename string) (needle.VolumeId, string, error) {
- if strings.HasSuffix(filename, ".idx") {
- base := filename[:len(filename)-len(".idx")]
+ if strings.HasSuffix(filename, ".idx") || strings.HasSuffix(filename, ".vif") {
+ base := filename[:len(filename)-4]
collection, volumeId, err := parseCollectionVolumeId(base)
return volumeId, collection, err
}
@@ -76,10 +76,10 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
if fileInfo.IsDir() {
return false
}
- if !strings.HasSuffix(basename, ".idx") {
+ if !strings.HasSuffix(basename, ".idx") && !strings.HasSuffix(basename, ".vif") {
return false
}
- volumeName := basename[:len(basename)-len(".idx")]
+ volumeName := basename[:len(basename)-4]
// check for incomplete volume
noteFile := l.Directory + "/" + volumeName + ".note"
@@ -108,7 +108,7 @@ func (l *DiskLocation) loadExistingVolume(fileInfo os.FileInfo, needleMapKind Ne
}
// load the volume
- v, e := NewVolume(l.Directory, collection, vid, needleMapKind, nil, nil, 0, 0)
+ v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0)
if e != nil {
glog.V(0).Infof("new volume %s error %s", volumeName, e)
return false
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index 07fab96d9..d1237b40f 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -68,7 +68,7 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
defer l.ecVolumesLock.Unlock()
ecVolume, found := l.ecVolumes[vid]
if !found {
- ecVolume, err = erasure_coding.NewEcVolume(l.Directory, collection, vid)
+ ecVolume, err = erasure_coding.NewEcVolume(l.Directory, l.IdxDirectory, collection, vid)
if err != nil {
return fmt.Errorf("failed to create ec volume %d: %v", vid, err)
}
@@ -122,6 +122,13 @@ func (l *DiskLocation) loadAllEcShards() (err error) {
if err != nil {
return fmt.Errorf("load all ec shards in dir %s: %v", l.Directory, err)
}
+ if l.IdxDirectory != l.Directory {
+ indexFileInfos, err := ioutil.ReadDir(l.IdxDirectory)
+ if err != nil {
+ return fmt.Errorf("load all ec shards in dir %s: %v", l.IdxDirectory, err)
+ }
+ fileInfos = append(fileInfos, indexFileInfos...)
+ }
sort.Slice(fileInfos, func(i, j int) bool {
return fileInfos[i].Name() < fileInfos[j].Name()
diff --git a/weed/storage/erasure_coding/ec_decoder.go b/weed/storage/erasure_coding/ec_decoder.go
index 795a7d523..bc86d9c04 100644
--- a/weed/storage/erasure_coding/ec_decoder.go
+++ b/weed/storage/erasure_coding/ec_decoder.go
@@ -45,14 +45,14 @@ func WriteIdxFileFromEcIndex(baseFileName string) (err error) {
// FindDatFileSize calculate .dat file size from max offset entry
// there may be extra deletions after that entry
// but they are deletions anyway
-func FindDatFileSize(baseFileName string) (datSize int64, err error) {
+func FindDatFileSize(dataBaseFileName, indexBaseFileName string) (datSize int64, err error) {
- version, err := readEcVolumeVersion(baseFileName)
+ version, err := readEcVolumeVersion(dataBaseFileName)
if err != nil {
- return 0, fmt.Errorf("read ec volume %s version: %v", baseFileName, err)
+ return 0, fmt.Errorf("read ec volume %s version: %v", dataBaseFileName, err)
}
- err = iterateEcxFile(baseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
+ err = iterateEcxFile(indexBaseFileName, func(key types.NeedleId, offset types.Offset, size types.Size) error {
if size.IsDeleted() {
return nil
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 71fe884df..2183e43d6 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -25,6 +25,7 @@ type EcVolume struct {
VolumeId needle.VolumeId
Collection string
dir string
+ dirIdx string
ecxFile *os.File
ecxFileSize int64
ecxCreatedAt time.Time
@@ -37,33 +38,34 @@ type EcVolume struct {
ecjFileAccessLock sync.Mutex
}
-func NewEcVolume(dir string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
- ev = &EcVolume{dir: dir, Collection: collection, VolumeId: vid}
+func NewEcVolume(dir string, dirIdx string, collection string, vid needle.VolumeId) (ev *EcVolume, err error) {
+ ev = &EcVolume{dir: dir, dirIdx: dirIdx, Collection: collection, VolumeId: vid}
- baseFileName := EcShardFileName(collection, dir, int(vid))
+ dataBaseFileName := EcShardFileName(collection, dir, int(vid))
+ indexBaseFileName := EcShardFileName(collection, dirIdx, int(vid))
// open ecx file
- if ev.ecxFile, err = os.OpenFile(baseFileName+".ecx", os.O_RDWR, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", baseFileName, err)
+ if ev.ecxFile, err = os.OpenFile(indexBaseFileName+".ecx", os.O_RDWR, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume index %s.ecx: %v", indexBaseFileName, err)
}
ecxFi, statErr := ev.ecxFile.Stat()
if statErr != nil {
- return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", baseFileName, statErr)
+ return nil, fmt.Errorf("can not stat ec volume index %s.ecx: %v", indexBaseFileName, statErr)
}
ev.ecxFileSize = ecxFi.Size()
ev.ecxCreatedAt = ecxFi.ModTime()
// open ecj file
- if ev.ecjFile, err = os.OpenFile(baseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", baseFileName, err)
+ if ev.ecjFile, err = os.OpenFile(indexBaseFileName+".ecj", os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return nil, fmt.Errorf("cannot open ec volume journal %s.ecj: %v", indexBaseFileName, err)
}
// read volume info
ev.Version = needle.Version3
- if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(baseFileName + ".vif"); found {
+ if volumeInfo, found, _ := pb.MaybeLoadVolumeInfo(dataBaseFileName + ".vif"); found {
ev.Version = needle.Version(volumeInfo.Version)
} else {
- pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
+ pb.SaveVolumeInfo(dataBaseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(ev.Version)})
}
ev.ShardLocations = make(map[ShardId][]string)
@@ -134,15 +136,26 @@ func (ev *EcVolume) Destroy() {
for _, s := range ev.Shards {
s.Destroy()
}
- os.Remove(ev.FileName() + ".ecx")
- os.Remove(ev.FileName() + ".ecj")
- os.Remove(ev.FileName() + ".vif")
+ os.Remove(ev.FileName(".ecx"))
+ os.Remove(ev.FileName(".ecj"))
+ os.Remove(ev.FileName(".vif"))
}
-func (ev *EcVolume) FileName() string {
+func (ev *EcVolume) FileName(ext string) string {
+ switch ext {
+ case ".ecx", ".ecj":
+ return ev.IndexBaseFileName() + ext
+ }
+ // .vif
+ return ev.DataBaseFileName() + ext
+}
+func (ev *EcVolume) DataBaseFileName() string {
return EcShardFileName(ev.Collection, ev.dir, int(ev.VolumeId))
+}
+func (ev *EcVolume) IndexBaseFileName() string {
+ return EcShardFileName(ev.Collection, ev.dirIdx, int(ev.VolumeId))
}
func (ev *EcVolume) ShardSize() uint64 {
diff --git a/weed/storage/needle_map_sorted_file.go b/weed/storage/needle_map_sorted_file.go
index 1ca113ca9..3449ff9dc 100644
--- a/weed/storage/needle_map_sorted_file.go
+++ b/weed/storage/needle_map_sorted_file.go
@@ -16,18 +16,18 @@ type SortedFileNeedleMap struct {
dbFileSize int64
}
-func NewSortedFileNeedleMap(baseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
- m = &SortedFileNeedleMap{baseFileName: baseFileName}
+func NewSortedFileNeedleMap(indexBaseFileName string, indexFile *os.File) (m *SortedFileNeedleMap, err error) {
+ m = &SortedFileNeedleMap{baseFileName: indexBaseFileName}
m.indexFile = indexFile
- fileName := baseFileName + ".sdx"
+ fileName := indexBaseFileName + ".sdx"
if !isSortedFileFresh(fileName, indexFile) {
glog.V(0).Infof("Start to Generate %s from %s", fileName, indexFile.Name())
- erasure_coding.WriteSortedFileFromIdx(baseFileName, ".sdx")
+ erasure_coding.WriteSortedFileFromIdx(indexBaseFileName, ".sdx")
glog.V(0).Infof("Finished Generating %s from %s", fileName, indexFile.Name())
}
glog.V(1).Infof("Opening %s...", fileName)
- if m.dbFile, err = os.Open(baseFileName + ".sdx"); err != nil {
+ if m.dbFile, err = os.Open(indexBaseFileName + ".sdx"); err != nil {
return
}
dbStat, _ := m.dbFile.Stat()
diff --git a/weed/storage/store.go b/weed/storage/store.go
index ce4e500d6..7e5768417 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -121,7 +121,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
if location := s.FindFreeLocation(); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
+ if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb); err == nil {
location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index a7a963a59..a03846d3d 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -21,6 +21,7 @@ import (
type Volume struct {
Id needle.VolumeId
dir string
+ dirIdx string
Collection string
DataBackend backend.BackendStorageFile
nm NeedleMapper
@@ -47,9 +48,9 @@ type Volume struct {
location *DiskLocation
}
-func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
+func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
- v = &Volume{dir: dirname, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
+ v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
@@ -61,7 +62,7 @@ func NewVolume(dirname string, collection string, id needle.VolumeId, needleMapK
func (v *Volume) String() string {
v.noWriteLock.RLock()
defer v.noWriteLock.RUnlock()
- return fmt.Sprintf("Id:%v, dir:%s, Collection:%s, dataFile:%v, nm:%v, noWrite:%v canDelete:%v", v.Id, v.dir, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
+ return fmt.Sprintf("Id:%v dir:%s dirIdx:%s Collection:%s dataFile:%v nm:%v noWrite:%v canDelete:%v", v.Id, v.dir, v.dirIdx, v.Collection, v.DataBackend, v.nm, v.noWriteOrDelete || v.noWriteCanDelete, v.noWriteCanDelete)
}
func VolumeFileName(dir string, collection string, id int) (fileName string) {
@@ -74,10 +75,23 @@ func VolumeFileName(dir string, collection string, id int) (fileName string) {
return
}
-func (v *Volume) FileName() (fileName string) {
+func (v *Volume) DataFileName() (fileName string) {
return VolumeFileName(v.dir, v.Collection, int(v.Id))
}
+func (v *Volume) IndexFileName() (fileName string) {
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))
+}
+
+func (v *Volume) FileName(ext string) (fileName string) {
+ switch ext {
+ case ".idx", ".cpx", ".ldb":
+ return VolumeFileName(v.dirIdx, v.Collection, int(v.Id))+ext
+ }
+ // .dat, .cpd, .vif
+ return VolumeFileName(v.dir, v.Collection, int(v.Id))+ext
+}
+
func (v *Volume) Version() needle.Version {
if v.volumeInfo.Version != 0 {
v.SuperBlock.Version = needle.Version(v.volumeInfo.Version)
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 595bd8a35..62004d4da 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -124,9 +124,9 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) {
}
func (v *Volume) locateLastAppendEntry() (Offset, error) {
- indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
+ indexFile, e := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
if e != nil {
- return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e)
+ return Offset{}, fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), e)
}
defer indexFile.Close()
@@ -156,9 +156,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
n, _, bodyLength, err := needle.ReadNeedleHeader(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset())
if err != nil {
- return 0, fmt.Errorf("ReadNeedleHeader: %v", err)
+ return 0, fmt.Errorf("ReadNeedleHeader %s [%d,%d): %v", v.DataBackend.Name(), offset.ToAcutalOffset(), offset.ToAcutalOffset()+NeedleHeaderSize, err)
}
- _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+int64(NeedleHeaderSize), bodyLength)
+ _, err = n.ReadNeedleBody(v.DataBackend, v.SuperBlock.Version, offset.ToAcutalOffset()+NeedleHeaderSize, bodyLength)
if err != nil {
return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err)
}
@@ -168,9 +168,9 @@ func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) {
// on server side
func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast bool, err error) {
- indexFile, openErr := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644)
+ indexFile, openErr := os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644)
if openErr != nil {
- err = fmt.Errorf("cannot read %s.idx: %v", v.FileName(), openErr)
+ err = fmt.Errorf("cannot read %s: %v", v.FileName(".idx"), openErr)
return
}
defer indexFile.Close()
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 05684cbdb..fe4980e31 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -23,7 +23,6 @@ func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeI
}
func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapType, preallocate int64) (err error) {
- fileName := v.FileName()
alreadyHasSuperBlock := false
hasVolumeInfoFile := v.maybeLoadVolumeInfo() && v.volumeInfo.Version != 0
@@ -34,17 +33,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
glog.V(0).Infof("loading volume %d from remote %v", v.Id, v.volumeInfo.Files)
v.LoadRemoteFile()
alreadyHasSuperBlock = true
- } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(fileName + ".dat"); exists {
+ } else if exists, canRead, canWrite, modifiedTime, fileSize := util.CheckFile(v.FileName(".dat")); exists {
// open dat file
if !canRead {
- return fmt.Errorf("cannot read Volume Data file %s.dat", fileName)
+ return fmt.Errorf("cannot read Volume Data file %s", v.FileName(".dat"))
}
var dataFile *os.File
if canWrite {
- dataFile, err = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
+ dataFile, err = os.OpenFile(v.FileName(".dat"), os.O_RDWR|os.O_CREATE, 0644)
} else {
- glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode")
- dataFile, err = os.Open(fileName + ".dat")
+ glog.V(0).Infof("opening %s in READONLY mode", v.FileName(".dat"))
+ dataFile, err = os.Open(v.FileName(".dat"))
v.noWriteOrDelete = true
}
v.lastModifiedTsSeconds = uint64(modifiedTime.Unix())
@@ -54,17 +53,17 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
v.DataBackend = backend.NewDiskFile(dataFile)
} else {
if createDatIfMissing {
- v.DataBackend, err = backend.CreateVolumeFile(fileName+".dat", preallocate, v.MemoryMapMaxSizeMb)
+ v.DataBackend, err = backend.CreateVolumeFile(v.FileName(".dat"), preallocate, v.MemoryMapMaxSizeMb)
} else {
- return fmt.Errorf("Volume Data file %s.dat does not exist.", fileName)
+ return fmt.Errorf("volume data file %s does not exist", v.FileName(".dat"))
}
}
if err != nil {
if !os.IsPermission(err) {
- return fmt.Errorf("cannot load Volume Data %s.dat: %v", fileName, err)
+ return fmt.Errorf("cannot load volume data %s: %v", v.FileName(".dat"), err)
} else {
- return fmt.Errorf("load data file %s.dat: %v", fileName, err)
+ return fmt.Errorf("load data file %s: %v", v.FileName(".dat"), err)
}
}
@@ -72,21 +71,27 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
err = v.readSuperBlock()
} else {
if !v.SuperBlock.Initialized() {
- return fmt.Errorf("volume %s.dat not initialized", fileName)
+ return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
}
err = v.maybeWriteSuperBlock()
}
if err == nil && alsoLoadIndex {
+ // adjust for existing volumes with .idx together with .dat files
+ if v.dirIdx != v.dir {
+ if util.FileExists(v.DataFileName()+".idx") {
+ v.dirIdx = v.dir
+ }
+ }
var indexFile *os.File
if v.noWriteOrDelete {
- glog.V(0).Infoln("open to read file", fileName+".idx")
- if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDONLY, 0644); err != nil {
- return fmt.Errorf("cannot read Volume Index %s.idx: %v", fileName, err)
+ glog.V(0).Infoln("open to read file", v.FileName(".idx"))
+ if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDONLY, 0644); err != nil {
+ return fmt.Errorf("cannot read Volume Index %s: %v", v.FileName(".idx"), err)
}
} else {
- glog.V(1).Infoln("open to write file", fileName+".idx")
- if indexFile, err = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644); err != nil {
- return fmt.Errorf("cannot write Volume Index %s.idx: %v", fileName, err)
+ glog.V(1).Infoln("open to write file", v.FileName(".idx"))
+ if indexFile, err = os.OpenFile(v.FileName(".idx"), os.O_RDWR|os.O_CREATE, 0644); err != nil {
+ return fmt.Errorf("cannot write Volume Index %s: %v", v.FileName(".idx"), err)
}
}
if v.lastAppendAtNs, err = CheckAndFixVolumeDataIntegrity(v, indexFile); err != nil {
@@ -95,45 +100,45 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
}
if v.noWriteOrDelete || v.noWriteCanDelete {
- if v.nm, err = NewSortedFileNeedleMap(fileName, indexFile); err != nil {
- glog.V(0).Infof("loading sorted db %s error: %v", fileName+".sdx", err)
+ if v.nm, err = NewSortedFileNeedleMap(v.IndexFileName(), indexFile); err != nil {
+ glog.V(0).Infof("loading sorted db %s error: %v", v.FileName(".sdx"), err)
}
} else {
switch needleMapKind {
case NeedleMapInMemory:
- glog.V(0).Infoln("loading index", fileName+".idx", "to memory")
+ glog.V(0).Infoln("loading index", v.FileName(".idx"), "to memory")
if v.nm, err = LoadCompactNeedleMap(indexFile); err != nil {
- glog.V(0).Infof("loading index %s to memory error: %v", fileName+".idx", err)
+ glog.V(0).Infof("loading index %s to memory error: %v", v.FileName(".idx"), err)
}
case NeedleMapLevelDb:
- glog.V(0).Infoln("loading leveldb", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
case NeedleMapLevelDbMedium:
- glog.V(0).Infoln("loading leveldb medium", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb medium", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
case NeedleMapLevelDbLarge:
- glog.V(0).Infoln("loading leveldb large", fileName+".ldb")
+ glog.V(0).Infoln("loading leveldb large", v.FileName(".ldb"))
opts := &opt.Options{
BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB
WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB
CompactionTableSizeMultiplier: 10, // default value is 1
}
- if v.nm, err = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); err != nil {
- glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", err)
+ if v.nm, err = NewLevelDbNeedleMap(v.FileName(".ldb"), indexFile, opts); err != nil {
+ glog.V(0).Infof("loading leveldb %s error: %v", v.FileName(".ldb"), err)
}
}
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 1a9beea7f..6dc4cb4a5 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -56,7 +56,8 @@ func (v *Volume) Destroy() (err error) {
}
}
v.Close()
- removeVolumeFiles(v.FileName())
+ removeVolumeFiles(v.DataFileName())
+ removeVolumeFiles(v.IndexFileName())
return
}
diff --git a/weed/storage/volume_tier.go b/weed/storage/volume_tier.go
index fd7b08654..77efd8a14 100644
--- a/weed/storage/volume_tier.go
+++ b/weed/storage/volume_tier.go
@@ -14,7 +14,7 @@ func (v *Volume) GetVolumeInfo() *volume_server_pb.VolumeInfo {
func (v *Volume) maybeLoadVolumeInfo() (found bool) {
- v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName() + ".vif")
+ v.volumeInfo, v.hasRemoteFile, _ = pb.MaybeLoadVolumeInfo(v.FileName(".vif"))
if v.hasRemoteFile {
glog.V(0).Infof("volume %d is tiered to %s as %s and read only", v.Id,
@@ -43,7 +43,7 @@ func (v *Volume) LoadRemoteFile() error {
func (v *Volume) SaveVolumeInfo() error {
- tierFileName := v.FileName() + ".vif"
+ tierFileName := v.FileName(".vif")
return pb.SaveVolumeInfo(tierFileName, v.volumeInfo)
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index a3e5800df..f7b16b7db 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -49,7 +49,6 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
v.isCompacting = false
}()
- filePath := v.FileName()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ,last offset %d...", v.Id, v.lastCompactIndexOffset)
@@ -59,7 +58,7 @@ func (v *Volume) Compact(preallocate int64, compactionBytePerSecond int64) error
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact fail to sync volume idx %d", v.Id)
}
- return v.copyDataAndGenerateIndexFile(filePath+".cpd", filePath+".cpx", preallocate, compactionBytePerSecond)
+ return v.copyDataAndGenerateIndexFile(v.FileName(".cpd"), v.FileName(".cpx"), preallocate, compactionBytePerSecond)
}
// compact a volume based on deletions in .idx files
@@ -75,7 +74,6 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
v.isCompacting = false
}()
- filePath := v.FileName()
v.lastCompactIndexOffset = v.IndexFileSize()
v.lastCompactRevision = v.SuperBlock.CompactionRevision
glog.V(3).Infof("creating copies for volume %d ...", v.Id)
@@ -85,7 +83,7 @@ func (v *Volume) Compact2(preallocate int64, compactionBytePerSecond int64) erro
if err := v.nm.Sync(); err != nil {
glog.V(0).Infof("compact2 fail to sync volume idx %d: %v", v.Id, err)
}
- return copyDataBasedOnIndexFile(filePath+".dat", filePath+".idx", filePath+".cpd", filePath+".cpx", v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
+ return copyDataBasedOnIndexFile(v.FileName(".dat"), v.FileName(".idx"), v.FileName(".cpd"), v.FileName(".cpx"), v.SuperBlock, v.Version(), preallocate, compactionBytePerSecond)
}
func (v *Volume) CommitCompact() error {
@@ -113,40 +111,40 @@ func (v *Volume) CommitCompact() error {
stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
var e error
- if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
+ if e = v.makeupDiff(v.FileName(".cpd"), v.FileName(".cpx"), v.FileName(".dat"), v.FileName(".idx")); e != nil {
glog.V(0).Infof("makeupDiff in CommitCompact volume %d failed %v", v.Id, e)
- e = os.Remove(v.FileName() + ".cpd")
+ e = os.Remove(v.FileName(".cpd"))
if e != nil {
return e
}
- e = os.Remove(v.FileName() + ".cpx")
+ e = os.Remove(v.FileName(".cpx"))
if e != nil {
return e
}
} else {
if runtime.GOOS == "windows" {
- e = os.RemoveAll(v.FileName() + ".dat")
+ e = os.RemoveAll(v.FileName(".dat"))
if e != nil {
return e
}
- e = os.RemoveAll(v.FileName() + ".idx")
+ e = os.RemoveAll(v.FileName(".idx"))
if e != nil {
return e
}
}
var e error
- if e = os.Rename(v.FileName()+".cpd", v.FileName()+".dat"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpd", e)
+ if e = os.Rename(v.FileName(".cpd"), v.FileName(".dat")); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName(".cpd"), e)
}
- if e = os.Rename(v.FileName()+".cpx", v.FileName()+".idx"); e != nil {
- return fmt.Errorf("rename %s: %v", v.FileName()+".cpx", e)
+ if e = os.Rename(v.FileName(".cpx"), v.FileName(".idx")); e != nil {
+ return fmt.Errorf("rename %s: %v", v.FileName(".cpx"), e)
}
}
//glog.V(3).Infof("Pretending to be vacuuming...")
//time.Sleep(20 * time.Second)
- os.RemoveAll(v.FileName() + ".ldb")
+ os.RemoveAll(v.FileName(".ldb"))
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
if e = v.load(true, false, v.needleMapKind, 0); e != nil {
@@ -158,8 +156,8 @@ func (v *Volume) CommitCompact() error {
func (v *Volume) cleanupCompact() error {
glog.V(0).Infof("Cleaning up volume %d vacuuming...", v.Id)
- e1 := os.Remove(v.FileName() + ".cpd")
- e2 := os.Remove(v.FileName() + ".cpx")
+ e1 := os.Remove(v.FileName(".cpd"))
+ e2 := os.Remove(v.FileName(".cpx"))
if e1 != nil {
return e1
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index f96e9b0cf..cd5a4f430 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -69,7 +69,7 @@ func TestCompaction(t *testing.T) {
}
defer os.RemoveAll(dir) // clean up
- v, err := NewVolume(dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -96,7 +96,7 @@ func TestCompaction(t *testing.T) {
v.Close()
- v, err = NewVolume(dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
+ v, err = NewVolume(dir, dir, "", 1, NeedleMapInMemory, nil, nil, 0, 0)
if err != nil {
t.Fatalf("volume reloading: %v", err)
}