aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/command/backup.go7
-rw-r--r--weed/command/compact.go2
-rw-r--r--weed/server/volume_grpc_admin.go4
-rw-r--r--weed/storage/disk_location.go2
-rw-r--r--weed/storage/idx_binary_search_test.go2
-rw-r--r--weed/storage/store.go8
-rw-r--r--weed/storage/volume.go4
-rw-r--r--weed/storage/volume_loading.go8
-rw-r--r--weed/storage/volume_read.go5
-rw-r--r--weed/storage/volume_read_test.go4
-rw-r--r--weed/storage/volume_super_block.go4
-rw-r--r--weed/storage/volume_vacuum.go2
-rw-r--r--weed/storage/volume_vacuum_test.go4
-rw-r--r--weed/storage/volume_write_test.go13
14 files changed, 38 insertions, 31 deletions
diff --git a/weed/command/backup.go b/weed/command/backup.go
index f9b9fba64..d5599372e 100644
--- a/weed/command/backup.go
+++ b/weed/command/backup.go
@@ -115,7 +115,10 @@ func runBackup(cmd *Command, args []string) bool {
return true
}
}
- v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
+
+ ver := needle.Version(stats.Version)
+
+ v, err := storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
@@ -142,7 +145,7 @@ func runBackup(cmd *Command, args []string) bool {
fmt.Printf("Error destroying volume: %v\n", err)
}
// recreate an empty volume
- v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, 0, 0)
+ v, err = storage.NewVolume(util.ResolvePath(*s.dir), util.ResolvePath(*s.dir), *s.collection, vid, storage.NeedleMapInMemory, replication, ttl, 0, ver, 0, 0)
if err != nil {
fmt.Printf("Error creating or reading from volume %d: %v\n", vid, err)
return true
diff --git a/weed/command/compact.go b/weed/command/compact.go
index 6f5f2307a..59e69bc74 100644
--- a/weed/command/compact.go
+++ b/weed/command/compact.go
@@ -41,7 +41,7 @@ func runCompact(cmd *Command, args []string) bool {
preallocate := *compactVolumePreallocate * (1 << 20)
vid := needle.VolumeId(*compactVolumeId)
- v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, 0, 0)
+ v, err := storage.NewVolume(util.ResolvePath(*compactVolumePath), util.ResolvePath(*compactVolumePath), *compactVolumeCollection, vid, storage.NeedleMapInMemory, nil, nil, preallocate, needle.GetCurrentVersion(), 0, 0)
if err != nil {
glog.Fatalf("Load Volume [ERROR] %s\n", err)
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index dd13e3465..52e085684 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,10 +3,11 @@ package weed_server
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util/version"
"path/filepath"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/util/version"
+
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@@ -48,6 +49,7 @@ func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_p
req.Replication,
req.Ttl,
req.Preallocate,
+ needle.Version(req.Version),
req.MemoryMapMaxSizeMb,
types.ToDiskType(req.DiskType),
vs.ldbTimout,
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index cc89c4ca1..a3f0b6585 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -178,7 +178,7 @@ func (l *DiskLocation) loadExistingVolume(dirEntry os.DirEntry, needleMapKind Ne
}
// load the volume
- v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, 0, ldbTimeout)
+ v, e := NewVolume(l.Directory, l.IdxDirectory, collection, vid, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, ldbTimeout)
if e != nil {
glog.V(0).Infof("new volume %s error %s", volumeName, e)
return false
diff --git a/weed/storage/idx_binary_search_test.go b/weed/storage/idx_binary_search_test.go
index 0f26cdd02..e04185bcd 100644
--- a/weed/storage/idx_binary_search_test.go
+++ b/weed/storage/idx_binary_search_test.go
@@ -14,7 +14,7 @@ import (
func TestFirstInvalidIndex(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 9bfcae7ba..69bb5bc3b 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -107,7 +107,7 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
return
}
-func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
+func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
@@ -116,7 +116,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
if e != nil {
return e
}
- e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, MemoryMapMaxSizeMb, diskType, ldbTimeout)
+ e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, ver, MemoryMapMaxSizeMb, diskType, ldbTimeout)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
@@ -159,7 +159,7 @@ func (s *Store) FindFreeLocation(filterFn func(location *DiskLocation) bool) (re
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
@@ -168,7 +168,7 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
}); location != nil {
glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v",
location.Directory, vid, collection, replicaPlacement, ttl)
- if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, memoryMapMaxSizeMb, ldbTimeout); err == nil {
+ if volume, err := NewVolume(location.Directory, location.IdxDirectory, collection, vid, needleMapKind, replicaPlacement, ttl, preallocate, ver, memoryMapMaxSizeMb, ldbTimeout); err == nil {
location.SetVolume(vid, volume)
glog.V(0).Infof("add volume %d", vid)
s.NewVolumesChan <- master_pb.VolumeShortInformationMessage{
diff --git a/weed/storage/volume.go b/weed/storage/volume.go
index e55564652..b495b379d 100644
--- a/weed/storage/volume.go
+++ b/weed/storage/volume.go
@@ -55,14 +55,14 @@ type Volume struct {
lastIoError error
}
-func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) {
+func NewVolume(dirname string, dirIdx string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, ver needle.Version, memoryMapMaxSizeMb uint32, ldbTimeout int64) (v *Volume, e error) {
// if replicaPlacement is nil, the superblock will be loaded from disk
v = &Volume{dir: dirname, dirIdx: dirIdx, Collection: collection, Id: id, MemoryMapMaxSizeMb: memoryMapMaxSizeMb,
asyncRequestsChan: make(chan *needle.AsyncRequest, 128)}
v.SuperBlock = super_block.SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl}
v.needleMapKind = needleMapKind
v.ldbTimeout = ldbTimeout
- e = v.load(true, true, needleMapKind, preallocate)
+ e = v.load(true, true, needleMapKind, preallocate, ver)
v.startWorker()
return
}
diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go
index 3334159ed..ca690618b 100644
--- a/weed/storage/volume_loading.go
+++ b/weed/storage/volume_loading.go
@@ -16,15 +16,15 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
-func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind) (v *Volume, err error) {
+func loadVolumeWithoutIndex(dirname string, collection string, id needle.VolumeId, needleMapKind NeedleMapKind, ver needle.Version) (v *Volume, err error) {
v = &Volume{dir: dirname, Collection: collection, Id: id}
v.SuperBlock = super_block.SuperBlock{}
v.needleMapKind = needleMapKind
- err = v.load(false, false, needleMapKind, 0)
+ err = v.load(false, false, needleMapKind, 0, ver)
return
}
-func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64) (err error) {
+func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind NeedleMapKind, preallocate int64, ver needle.Version) (err error) {
alreadyHasSuperBlock := false
hasLoadedVolume := false
@@ -105,7 +105,7 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind
if !v.SuperBlock.Initialized() {
return fmt.Errorf("volume %s not initialized", v.FileName(".dat"))
}
- err = v.maybeWriteSuperBlock()
+ err = v.maybeWriteSuperBlock(ver)
}
if err == nil && alsoLoadIndex {
// adjust for existing volumes with .idx together with .dat files
diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go
index f82e3e72d..26aa8ac8a 100644
--- a/weed/storage/volume_read.go
+++ b/weed/storage/volume_read.go
@@ -2,10 +2,11 @@ package storage
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/util/mem"
"io"
"time"
+ "github.com/seaweedfs/seaweedfs/weed/util/mem"
+
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
@@ -225,7 +226,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
needleMapKind NeedleMapKind,
volumeFileScanner VolumeFileScanner) (err error) {
var v *Volume
- if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind); err != nil {
+ if v, err = loadVolumeWithoutIndex(dirname, collection, id, needleMapKind, needle.GetCurrentVersion()); err != nil {
return fmt.Errorf("failed to load volume %d: %v", id, err)
}
if err = volumeFileScanner.VisitSuperBlock(v.SuperBlock); err != nil {
diff --git a/weed/storage/volume_read_test.go b/weed/storage/volume_read_test.go
index 8bbd0058b..efedadc31 100644
--- a/weed/storage/volume_read_test.go
+++ b/weed/storage/volume_read_test.go
@@ -12,7 +12,7 @@ import (
func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -51,7 +51,7 @@ func TestReadNeedMetaWithWritesAndUpdates(t *testing.T) {
func TestReadNeedMetaWithDeletesThenWrites(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
diff --git a/weed/storage/volume_super_block.go b/weed/storage/volume_super_block.go
index 1d411471f..baf490af8 100644
--- a/weed/storage/volume_super_block.go
+++ b/weed/storage/volume_super_block.go
@@ -10,7 +10,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
)
-func (v *Volume) maybeWriteSuperBlock() error {
+func (v *Volume) maybeWriteSuperBlock(ver needle.Version) error {
datSize, _, e := v.DataBackend.GetStat()
if e != nil {
@@ -18,7 +18,7 @@ func (v *Volume) maybeWriteSuperBlock() error {
return e
}
if datSize == 0 {
- v.SuperBlock.Version = needle.GetCurrentVersion()
+ v.SuperBlock.Version = ver
_, e = v.DataBackend.WriteAt(v.SuperBlock.Bytes(), 0)
if e != nil && os.IsPermission(e) {
//read-only, but zero length - recreate it!
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 9f277d4f5..1d6cdf9e0 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -175,7 +175,7 @@ func (v *Volume) CommitCompact() error {
os.RemoveAll(v.FileName(".ldb"))
glog.V(3).Infof("Loading volume %d commit file...", v.Id)
- if e = v.load(true, false, v.needleMapKind, 0); e != nil {
+ if e = v.load(true, false, v.needleMapKind, 0, v.Version()); e != nil {
return e
}
glog.V(3).Infof("Finish committing volume %d", v.Id)
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 079b7ccf5..797452bb3 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -72,7 +72,7 @@ func TestLDBIndexCompaction(t *testing.T) {
func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, needleMapKind, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -115,7 +115,7 @@ func testCompaction(t *testing.T, needleMapKind NeedleMapKind) {
v.Close()
- v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, 0, 0)
+ v, err = NewVolume(dir, dir, "", 1, needleMapKind, nil, nil, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume reloading: %v", err)
}
diff --git a/weed/storage/volume_write_test.go b/weed/storage/volume_write_test.go
index 63815aecd..ce7c184bd 100644
--- a/weed/storage/volume_write_test.go
+++ b/weed/storage/volume_write_test.go
@@ -3,11 +3,12 @@ package storage
import (
"errors"
"fmt"
- "github.com/stretchr/testify/assert"
"os"
"testing"
"time"
+ "github.com/stretchr/testify/assert"
+
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/super_block"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@@ -16,7 +17,7 @@ import (
func TestSearchVolumesWithDeletedNeedles(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -78,7 +79,7 @@ func assertFileExist(t *testing.T, expected bool, path string) {
func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -96,7 +97,7 @@ func TestDestroyEmptyVolumeWithOnlyEmpty(t *testing.T) {
func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -114,7 +115,7 @@ func TestDestroyEmptyVolumeWithoutOnlyEmpty(t *testing.T) {
func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}
@@ -144,7 +145,7 @@ func TestDestroyNonemptyVolumeWithOnlyEmpty(t *testing.T) {
func TestDestroyNonemptyVolumeWithoutOnlyEmpty(t *testing.T) {
dir := t.TempDir()
- v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, 0, 0)
+ v, err := NewVolume(dir, dir, "", 1, NeedleMapInMemory, &super_block.ReplicaPlacement{}, &needle.TTL{}, 0, needle.GetCurrentVersion(), 0, 0)
if err != nil {
t.Fatalf("volume creation: %v", err)
}