aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/store.go18
-rw-r--r--go/storage/volume.go26
-rw-r--r--go/storage/volume_info.go1
3 files changed, 36 insertions, 9 deletions
diff --git a/go/storage/store.go b/go/storage/store.go
index 142ee84e2..ccb6a89fb 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -120,9 +120,10 @@ func (s *Store) loadExistingVolumes() {
func (s *Store) Status() []*VolumeInfo {
var stats []*VolumeInfo
for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
- VolumeId(k), v.ContentSize(), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s := &VolumeInfo{Id: VolumeId(k), Size: v.ContentSize(),
+ RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
+ DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
+ ReadOnly: v.readOnly}
stats = append(stats, s)
}
return stats
@@ -138,9 +139,10 @@ func (s *Store) SetMaster(mserver string) {
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
- s := new(VolumeInfo)
- s.Id, s.Size, s.RepType, s.Version, s.FileCount, s.DeleteCount, s.DeletedByteCount =
- VolumeId(k), uint64(v.Size()), v.ReplicaType, v.Version(), v.nm.fileCounter, v.nm.deletionCounter, v.nm.deletionByteCounter
+ s := &VolumeInfo{Id: VolumeId(k), Size: uint64(v.Size()),
+ RepType: v.ReplicaType, Version: v.Version(), FileCount: v.nm.fileCounter,
+ DeleteCount: v.nm.deletionCounter, DeletedByteCount: v.nm.deletionByteCounter,
+ ReadOnly: v.readOnly}
*stats = append(*stats, s)
}
bytes, _ := json.Marshal(stats)
@@ -171,7 +173,7 @@ func (s *Store) Close() {
}
}
func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
- if v := s.volumes[i]; v != nil {
+ if v := s.volumes[i]; v != nil && !v.readOnly {
size, err = v.write(n)
if err != nil && s.volumeSizeLimit < v.ContentSize()+uint64(size) && s.volumeSizeLimit >= v.ContentSize() {
log.Println("volume", i, "size is", v.ContentSize(), "close to", s.volumeSizeLimit)
@@ -185,7 +187,7 @@ func (s *Store) Write(i VolumeId, n *Needle) (size uint32, err error) {
return
}
func (s *Store) Delete(i VolumeId, n *Needle) (uint32, error) {
- if v := s.volumes[i]; v != nil {
+ if v := s.volumes[i]; v != nil && !v.readOnly {
return v.delete(n)
}
return 0, nil
diff --git a/go/storage/volume.go b/go/storage/volume.go
index b9c7484d7..42a931a6d 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
+ "log"
"os"
"path"
"sync"
@@ -30,6 +31,7 @@ type Volume struct {
dir string
dataFile *os.File
nm *NeedleMap
+ readOnly bool
SuperBlock
@@ -53,7 +55,14 @@ func (v *Volume) load(alsoLoadIndex bool) error {
fileName := path.Join(v.dir, v.Id.String())
v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644)
if e != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ if !os.IsPermission(e) {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ if v.dataFile, e = os.Open(fileName + ".dat"); e != nil {
+ return fmt.Errorf("cannot open Volume Data %s.dat: %s", fileName, e)
+ }
+ log.Printf("opening " + fileName + ".dat in READONLY mode")
+ v.readOnly = true
}
if v.ReplicaType == CopyNil {
e = v.readSuperBlock()
@@ -97,6 +106,14 @@ func (v *Volume) maybeWriteSuperBlock() error {
if stat.Size() == 0 {
v.SuperBlock.Version = CurrentVersion
_, e = v.dataFile.Write(v.SuperBlock.Bytes())
+ if e != nil && os.IsPermission(e) {
+ //read-only, but zero length - recreate it!
+ if v.dataFile, e = os.Create(v.dataFile.Name()); e == nil {
+ if _, e = v.dataFile.Write(v.SuperBlock.Bytes()); e == nil {
+ v.readOnly = false
+ }
+ }
+ }
}
return e
}
@@ -123,6 +140,10 @@ func (v *Volume) NeedToReplicate() bool {
}
func (v *Volume) write(n *Needle) (size uint32, err error) {
+ if v.readOnly {
+ err = fmt.Errorf("%s is read-only", v.dataFile)
+ return
+ }
v.accessLock.Lock()
defer v.accessLock.Unlock()
var offset int64
@@ -142,6 +163,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) {
return
}
func (v *Volume) delete(n *Needle) (uint32, error) {
+ if v.readOnly {
+ return 0, fmt.Errorf("%s is read-only", v.dataFile)
+ }
v.accessLock.Lock()
defer v.accessLock.Unlock()
nv, ok := v.nm.Get(n.Id)
diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go
index e4c5f6ec4..5a83b6e36 100644
--- a/go/storage/volume_info.go
+++ b/go/storage/volume_info.go
@@ -10,4 +10,5 @@ type VolumeInfo struct {
FileCount int
DeleteCount int
DeletedByteCount uint64
+ ReadOnly bool
}