aboutsummaryrefslogtreecommitdiff
path: root/weed/storage/store.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/storage/store.go')
-rw-r--r--weed/storage/store.go63
1 files changed, 47 insertions, 16 deletions
diff --git a/weed/storage/store.go b/weed/storage/store.go
index 4d1061bed..e29680f6f 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -2,14 +2,19 @@ package storage
import (
"fmt"
+ "path/filepath"
+ "strings"
"sync/atomic"
+ "google.golang.org/grpc"
+
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/super_block"
. "github.com/chrislusf/seaweedfs/weed/storage/types"
- "google.golang.org/grpc"
)
const (
@@ -60,7 +65,7 @@ func NewStore(grpcDialOption grpc.DialOption, port int, ip, publicUrl string, di
return
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement string, ttlString string, preallocate int64, MemoryMapMaxSizeMb uint32) error {
- rt, e := NewReplicaPlacementFromString(replicaPlacement)
+ rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
return e
}
@@ -101,7 +106,7 @@ func (s *Store) FindFreeLocation() (ret *DiskLocation) {
}
return ret
}
-func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
+func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind NeedleMapType, replicaPlacement *super_block.ReplicaPlacement, ttl *needle.TTL, preallocate int64, memoryMapMaxSizeMb uint32) error {
if s.findVolume(vid) != nil {
return fmt.Errorf("Volume Id %d already exists!", vid)
}
@@ -126,10 +131,10 @@ func (s *Store) addVolume(vid needle.VolumeId, collection string, needleMapKind
return fmt.Errorf("No more free space left")
}
-func (s *Store) Status() []*VolumeInfo {
+func (s *Store) VolumeInfos() []*VolumeInfo {
var stats []*VolumeInfo
for _, location := range s.Locations {
- location.RLock()
+ location.volumesLock.RLock()
for k, v := range location.volumes {
s := &VolumeInfo{
Id: needle.VolumeId(k),
@@ -140,13 +145,14 @@ func (s *Store) Status() []*VolumeInfo {
FileCount: int(v.FileCount()),
DeleteCount: int(v.DeletedCount()),
DeletedByteCount: v.DeletedSize(),
- ReadOnly: v.readOnly,
+ ReadOnly: v.noWriteOrDelete || v.noWriteCanDelete,
Ttl: v.Ttl,
CompactRevision: uint32(v.CompactionRevision),
}
+ s.RemoteStorageName, s.RemoteStorageKey = v.RemoteStorageNameKey()
stats = append(stats, s)
}
- location.RUnlock()
+ location.volumesLock.RUnlock()
}
sortVolumeInfos(stats)
return stats
@@ -167,7 +173,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
for _, location := range s.Locations {
var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
- location.RLock()
+ location.volumesLock.RLock()
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
@@ -184,16 +190,16 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
- location.RUnlock()
+ location.volumesLock.RUnlock()
if len(deleteVids) > 0 {
// delete expired volumes.
- location.Lock()
+ location.volumesLock.Lock()
for _, vid := range deleteVids {
location.deleteVolumeById(vid)
glog.V(0).Infoln("volume", vid, "is deleted.")
}
- location.Unlock()
+ location.volumesLock.Unlock()
}
}
@@ -223,11 +229,11 @@ func (s *Store) Close() {
func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
- if v.readOnly {
+ if v.noWriteOrDelete || v.noWriteCanDelete {
err = fmt.Errorf("volume %d is read only", i)
return
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
_, size, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -241,10 +247,10 @@ func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uin
func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
if v := s.findVolume(i); v != nil {
- if v.readOnly {
+ if v.noWriteOrDelete {
return 0, fmt.Errorf("volume %d is read only", i)
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.version)) {
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(0, v.Version())) {
return v.deleteNeedle(n)
} else {
return 0, fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
@@ -273,7 +279,7 @@ func (s *Store) MarkVolumeReadonly(i needle.VolumeId) error {
if v == nil {
return fmt.Errorf("volume %d not found", i)
}
- v.readOnly = true
+ v.noWriteOrDelete = true
return nil
}
@@ -343,6 +349,31 @@ func (s *Store) DeleteVolume(i needle.VolumeId) error {
return fmt.Errorf("volume %d not found on disk", i)
}
+func (s *Store) ConfigureVolume(i needle.VolumeId, replication string) error {
+
+ for _, location := range s.Locations {
+ fileInfo, found := location.LocateVolume(i)
+ if !found {
+ continue
+ }
+ // load, modify, save
+ baseFileName := strings.TrimSuffix(fileInfo.Name(), filepath.Ext(fileInfo.Name()))
+ vifFile := filepath.Join(location.Directory, baseFileName+".vif")
+ volumeInfo, _, err := pb.MaybeLoadVolumeInfo(vifFile)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to load vif", i)
+ }
+ volumeInfo.Replication = replication
+ err = pb.SaveVolumeInfo(vifFile, volumeInfo)
+ if err != nil {
+ return fmt.Errorf("volume %d fail to save vif", i)
+ }
+ return nil
+ }
+
+ return fmt.Errorf("volume %d not found on disk", i)
+}
+
func (s *Store) SetVolumeSizeLimit(x uint64) {
atomic.StoreUint64(&s.volumeSizeLimit, x)
}