aboutsummaryrefslogtreecommitdiff
path: root/go/storage
diff options
context:
space:
mode:
Diffstat (limited to 'go/storage')
-rw-r--r--go/storage/needle_map.go70
-rw-r--r--go/storage/store.go20
-rw-r--r--go/storage/volume.go52
3 files changed, 115 insertions, 27 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go
index c836a87fb..89773b341 100644
--- a/go/storage/needle_map.go
+++ b/go/storage/needle_map.go
@@ -52,37 +52,61 @@ const (
func LoadNeedleMap(file *os.File) (*NeedleMap, error) {
nm := NewNeedleMap(file)
- bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024)
- bytes := make([]byte, 16*RowsToRead)
- count, e := bufferReader.Read(bytes)
- for count > 0 && e == nil {
- for i := 0; i < count; i += 16 {
- key := util.BytesToUint64(bytes[i : i+8])
- offset := util.BytesToUint32(bytes[i+8 : i+12])
- size := util.BytesToUint32(bytes[i+12 : i+16])
- nm.FileCounter++
- nm.FileByteCounter = nm.FileByteCounter + uint64(size)
- if offset > 0 {
- oldSize := nm.m.Set(Key(key), offset, size)
- //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
- if oldSize > 0 {
- nm.DeletionCounter++
- nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
- }
- } else {
- oldSize := nm.m.Delete(Key(key))
- //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ e := walkIndexFile(file, func(key uint64, offset, size uint32) error {
+ nm.FileCounter++
+ nm.FileByteCounter = nm.FileByteCounter + uint64(size)
+ if offset > 0 {
+ oldSize := nm.m.Set(Key(key), offset, size)
+ //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ if oldSize > 0 {
nm.DeletionCounter++
nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
+ } else {
+ oldSize := nm.m.Delete(Key(key))
+ //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize)
+ nm.DeletionCounter++
+ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize)
}
+ return nil
+ })
+ return nm, e
+}
+
+// walks through the index file, calls fn function with each key, offset, size
+// stops with the error returned by the fn function
+func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error {
+ br := bufio.NewReaderSize(r, 1024*1024)
+ bytes := make([]byte, 16*RowsToRead)
+ count, e := br.Read(bytes)
+ var (
+ key uint64
+ offset, size uint32
+ i int
+ )
- count, e = bufferReader.Read(bytes)
+ for count > 0 && e == nil {
+ for i = 0; i+16 <= count; i += 16 {
+ key = util.BytesToUint64(bytes[i : i+8])
+ offset = util.BytesToUint32(bytes[i+8 : i+12])
+ size = util.BytesToUint32(bytes[i+12 : i+16])
+ if e = fn(key, offset, size); e != nil {
+ return e
+ }
+ }
+ if count%16 != 0 {
+ copy(bytes[:count-i], bytes[i:count])
+ i = count - i
+ count, e = br.Read(bytes[i:])
+ count += i
+ } else {
+ count, e = br.Read(bytes)
+ }
}
if e == io.EOF {
- e = nil
+ return nil
}
- return nm, e
+ return e
}
func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) {
diff --git a/go/storage/store.go b/go/storage/store.go
index 954bae0ae..0889c9330 100644
--- a/go/storage/store.go
+++ b/go/storage/store.go
@@ -20,6 +20,8 @@ type Store struct {
MaxVolumeCount int
masterNode string
+ dataCenter string //optional informaton, overwriting master setting if exists
+ rack string //optional information, overwriting master setting if exists
connected bool
volumeSizeLimit uint64 //read from the master
@@ -99,6 +101,16 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error {
}
return s.volumes[vid].commitCompact()
}
+func (s *Store) FreezeVolume(volumeIdString string) error {
+ vid, err := NewVolumeId(volumeIdString)
+ if err != nil {
+ return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!")
+ }
+ if s.volumes[vid].readOnly {
+ return errors.New("Volume " + volumeIdString + " is already read-only")
+ }
+ return s.volumes[vid].freeze()
+}
func (s *Store) loadExistingVolumes() {
if dirs, err := ioutil.ReadDir(s.dir); err == nil {
for _, dir := range dirs {
@@ -138,6 +150,12 @@ type JoinResult struct {
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
+func (s *Store) SetDataCenter(dataCenter string) {
+ s.dataCenter = dataCenter
+}
+func (s *Store) SetRack(rack string) {
+ s.rack = rack
+}
func (s *Store) Join() error {
stats := new([]*VolumeInfo)
for k, v := range s.volumes {
@@ -159,6 +177,8 @@ func (s *Store) Join() error {
values.Add("publicUrl", s.PublicUrl)
values.Add("volumes", string(bytes))
values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount))
+ values.Add("dataCenter", s.dataCenter)
+ values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
if err != nil {
return err
diff --git a/go/storage/volume.go b/go/storage/volume.go
index 98f712433..4e6db3634 100644
--- a/go/storage/volume.go
+++ b/go/storage/volume.go
@@ -70,10 +70,29 @@ func (v *Volume) load(alsoLoadIndex bool) error {
e = v.maybeWriteSuperBlock()
}
if e == nil && alsoLoadIndex {
- indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
- if ie != nil {
- return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
- }
+ var indexFile *os.File
+ if v.readOnly {
+ if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) {
+ return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e)
+ }
+ if indexFile != nil {
+ log.Printf("converting %s.idx to %s.cdb", fileName, fileName)
+ if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil {
+ log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName)
+ } else {
+ indexFile.Close()
+ os.Remove(indexFile.Name())
+ indexFile = nil
+ }
+ }
+ v.nm, e = OpenCdbMap(fileName + ".cdb")
+ return e
+ } else {
+ indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644)
+ if e != nil {
+ return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e)
+ }
+ }
v.nm, e = LoadNeedleMap(indexFile)
}
return e
@@ -224,6 +243,31 @@ func (v *Volume) commitCompact() error {
}
return nil
}
+func (v *Volume) freeze() error {
+ if v.readOnly {
+ return nil
+ }
+ nm, ok := v.nm.(*NeedleMap)
+ if !ok {
+ return nil
+ }
+ v.accessLock.Lock()
+ defer v.accessLock.Unlock()
+ bn, _ := nakeFilename(v.dataFile.Name())
+ cdbFn := bn + ".cdb"
+ log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn)
+ err := DumpNeedleMapToCdb(cdbFn, nm)
+ if err != nil {
+ return err
+ }
+ if v.nm, err = OpenCdbMap(cdbFn); err != nil {
+ return err
+ }
+ nm.indexFile.Close()
+ os.Remove(nm.indexFile.Name())
+ v.readOnly = true
+ return nil
+}
func ScanVolumeFile(dirname string, id VolumeId,
visitSuperBlock func(SuperBlock) error,