diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle_map.go | 70 | ||||
| -rw-r--r-- | go/storage/store.go | 20 | ||||
| -rw-r--r-- | go/storage/volume.go | 52 |
3 files changed, 115 insertions, 27 deletions
diff --git a/go/storage/needle_map.go b/go/storage/needle_map.go index c836a87fb..89773b341 100644 --- a/go/storage/needle_map.go +++ b/go/storage/needle_map.go @@ -52,37 +52,61 @@ const ( func LoadNeedleMap(file *os.File) (*NeedleMap, error) { nm := NewNeedleMap(file) - bufferReader := bufio.NewReaderSize(nm.indexFile, 1024*1024) - bytes := make([]byte, 16*RowsToRead) - count, e := bufferReader.Read(bytes) - for count > 0 && e == nil { - for i := 0; i < count; i += 16 { - key := util.BytesToUint64(bytes[i : i+8]) - offset := util.BytesToUint32(bytes[i+8 : i+12]) - size := util.BytesToUint32(bytes[i+12 : i+16]) - nm.FileCounter++ - nm.FileByteCounter = nm.FileByteCounter + uint64(size) - if offset > 0 { - oldSize := nm.m.Set(Key(key), offset, size) - //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) - if oldSize > 0 { - nm.DeletionCounter++ - nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) - } - } else { - oldSize := nm.m.Delete(Key(key)) - //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + e := walkIndexFile(file, func(key uint64, offset, size uint32) error { + nm.FileCounter++ + nm.FileByteCounter = nm.FileByteCounter + uint64(size) + if offset > 0 { + oldSize := nm.m.Set(Key(key), offset, size) + //log.Println("reading key", key, "offset", offset, "size", size, "oldSize", oldSize) + if oldSize > 0 { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } + } else { + oldSize := nm.m.Delete(Key(key)) + //log.Println("removing key", key, "offset", offset, "size", size, "oldSize", oldSize) + nm.DeletionCounter++ + nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } + return nil + }) + return nm, e +} + +// walks through the index file, calls fn function with each key, offset, size +// stops with the error returned by the fn function +func walkIndexFile(r io.Reader, fn func(key uint64, offset, size uint32) error) error { + br := bufio.NewReaderSize(r, 1024*1024) + bytes := make([]byte, 16*RowsToRead) + count, e := br.Read(bytes) + var ( + key uint64 + offset, size uint32 + i int + ) - count, e = bufferReader.Read(bytes) + for count > 0 && e == nil { + for i = 0; i+16 <= count; i += 16 { + key = util.BytesToUint64(bytes[i : i+8]) + offset = util.BytesToUint32(bytes[i+8 : i+12]) + size = util.BytesToUint32(bytes[i+12 : i+16]) + if e = fn(key, offset, size); e != nil { + return e + } + } + if count%16 != 0 { + copy(bytes[:count-i], bytes[i:count]) + i = count - i + count, e = br.Read(bytes[i:]) + count += i + } else { + count, e = br.Read(bytes) + } } if e == io.EOF { - e = nil + return nil } - return nm, e + return e } func (nm *NeedleMap) Put(key uint64, offset uint32, size uint32) (int, error) { diff --git a/go/storage/store.go b/go/storage/store.go index 954bae0ae..0889c9330 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -20,6 +20,8 @@ type Store struct { MaxVolumeCount int masterNode string + dataCenter string //optional informaton, overwriting master setting if exists + rack string //optional information, overwriting master setting if exists connected bool volumeSizeLimit uint64 //read from the master @@ -99,6 +101,16 @@ func (s *Store) CommitCompactVolume(volumeIdString string) error { } return s.volumes[vid].commitCompact() } +func (s *Store) FreezeVolume(volumeIdString string) error { + vid, err := NewVolumeId(volumeIdString) + if err != nil { + return errors.New("Volume Id " + volumeIdString + " is not a valid unsigned integer!") + } + if s.volumes[vid].readOnly { + return errors.New("Volume " + volumeIdString + " is already read-only") + } + return s.volumes[vid].freeze() +} func (s *Store) loadExistingVolumes() { if dirs, err := ioutil.ReadDir(s.dir); err == nil { for _, dir := range dirs { @@ -138,6 +150,12 @@ type JoinResult struct { func (s *Store) SetMaster(mserver string) { s.masterNode = mserver } +func (s *Store) SetDataCenter(dataCenter string) { + s.dataCenter = dataCenter +} +func (s *Store) SetRack(rack string) { + s.rack = rack +} func (s *Store) Join() error { stats := new([]*VolumeInfo) for k, v := range s.volumes { @@ -159,6 +177,8 @@ func (s *Store) Join() error { values.Add("publicUrl", s.PublicUrl) values.Add("volumes", string(bytes)) values.Add("maxVolumeCount", strconv.Itoa(s.MaxVolumeCount)) + values.Add("dataCenter", s.dataCenter) + values.Add("rack", s.rack) jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) if err != nil { return err diff --git a/go/storage/volume.go b/go/storage/volume.go index 98f712433..4e6db3634 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -70,10 +70,29 @@ func (v *Volume) load(alsoLoadIndex bool) error { e = v.maybeWriteSuperBlock() } if e == nil && alsoLoadIndex { - indexFile, ie := os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) - if ie != nil { - return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) - } + var indexFile *os.File + if v.readOnly { + if indexFile, e = os.Open(fileName + ".idx"); e != nil && !os.IsNotExist(e) { + return fmt.Errorf("cannot open index file %s.idx: %s", fileName, e) + } + if indexFile != nil { + log.Printf("converting %s.idx to %s.cdb", fileName, fileName) + if e = ConvertIndexToCdb(fileName+".cdb", indexFile); e != nil { + log.Printf("error converting %s.idx to %s.cdb: %s", fileName, fileName) + } else { + indexFile.Close() + os.Remove(indexFile.Name()) + indexFile = nil + } + } + v.nm, e = OpenCdbMap(fileName + ".cdb") + return e + } else { + indexFile, e = os.OpenFile(fileName+".idx", os.O_RDWR|os.O_CREATE, 0644) + if e != nil { + return fmt.Errorf("cannot create Volume Data %s.dat: %s", fileName, e) + } + } v.nm, e = LoadNeedleMap(indexFile) } return e @@ -224,6 +243,31 @@ func (v *Volume) commitCompact() error { } return nil } +func (v *Volume) freeze() error { + if v.readOnly { + return nil + } + nm, ok := v.nm.(*NeedleMap) + if !ok { + return nil + } + v.accessLock.Lock() + defer v.accessLock.Unlock() + bn, _ := nakeFilename(v.dataFile.Name()) + cdbFn := bn + ".cdb" + log.Printf("converting %s to %s", nm.indexFile.Name(), cdbFn) + err := DumpNeedleMapToCdb(cdbFn, nm) + if err != nil { + return err + } + if v.nm, err = OpenCdbMap(cdbFn); err != nil { + return err + } + nm.indexFile.Close() + os.Remove(nm.indexFile.Name()) + v.readOnly = true + return nil +} func ScanVolumeFile(dirname string, id VolumeId, visitSuperBlock func(SuperBlock) error, |
