diff options
Diffstat (limited to 'go/storage')
| -rw-r--r-- | go/storage/needle.go | 9 | ||||
| -rw-r--r-- | go/storage/needle_read_write.go | 21 | ||||
| -rw-r--r-- | go/storage/replica_placement.go | 8 | ||||
| -rw-r--r-- | go/storage/store.go | 67 | ||||
| -rw-r--r-- | go/storage/volume.go | 74 | ||||
| -rw-r--r-- | go/storage/volume_info.go | 2 | ||||
| -rw-r--r-- | go/storage/volume_super_block.go | 10 | ||||
| -rw-r--r-- | go/storage/volume_super_block_test.go | 7 | ||||
| -rw-r--r-- | go/storage/volume_ttl.go | 147 | ||||
| -rw-r--r-- | go/storage/volume_vacuum.go | 14 |
10 files changed, 311 insertions, 48 deletions
diff --git a/go/storage/needle.go b/go/storage/needle.go index 77aa70169..3bf627141 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -38,12 +38,13 @@ type Needle struct { MimeSize uint8 //version2 Mime []byte `comment:"maximum 256 characters"` //version2 LastModified uint64 //only store LastModifiedBytesLength bytes, which is 5 bytes to disk + Ttl *TTL Checksum CRC `comment:"CRC32 to check integrity"` Padding []byte `comment:"Aligned to 8 bytes"` } -func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, e error) { +func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string, isGzipped bool, modifiedTime uint64, ttl *TTL, e error) { form, fe := r.MultipartReader() if fe != nil { glog.V(0).Infoln("MultipartReader [ERROR]", fe) @@ -92,12 +93,13 @@ func ParseUpload(r *http.Request) (fileName string, data []byte, mimeType string fileName = fileName[:len(fileName)-3] } modifiedTime, _ = strconv.ParseUint(r.FormValue("ts"), 10, 64) + ttl, _ = ReadTTL(r.FormValue("ttl")) return } func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { fname, mimeType, isGzipped := "", "", false n = new(Needle) - fname, n.Data, mimeType, isGzipped, n.LastModified, e = ParseUpload(r) + fname, n.Data, mimeType, isGzipped, n.LastModified, n.Ttl, e = ParseUpload(r) if e != nil { return } @@ -116,6 +118,9 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { n.LastModified = uint64(time.Now().Unix()) } n.SetHasLastModifiedDate() + if n.Ttl != nil { + n.SetHasTtl() + } if fixJpgOrientation { loweredName := strings.ToLower(fname) diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 835d7c270..848121ff2 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -14,7 +14,9 @@ const ( FlagHasName = 0x02 FlagHasMime = 0x04 FlagHasLastModifiedDate = 0x08 + FlagHasTtl = 0x10 LastModifiedBytesLength = 5 + TtlBytesLength = 2 ) func (n *Needle) DiskSize() int64 { @@ -70,6 +72,9 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { if n.HasLastModifiedDate() { n.Size = n.Size + LastModifiedBytesLength } + if n.HasTtl() { + n.Size = n.Size + TtlBytesLength + } } size = n.DataSize util.Uint32toBytes(header[12:16], n.Size) @@ -112,6 +117,12 @@ func (n *Needle) Append(w io.Writer, version Version) (size uint32, err error) { return } } + if n.HasTtl() { + n.Ttl.ToBytes(header[0:TtlBytesLength]) + if _, err = w.Write(header[0:TtlBytesLength]); err != nil { + return + } + } } padding := NeedlePaddingSize - ((NeedleHeaderSize + n.Size + NeedleChecksumSize) % NeedlePaddingSize) util.Uint32toBytes(header[0:NeedleChecksumSize], n.Checksum.Value()) @@ -194,6 +205,10 @@ func (n *Needle) readNeedleDataVersion2(bytes []byte) { n.LastModified = util.BytesToUint64(bytes[index : index+LastModifiedBytesLength]) index = index + LastModifiedBytesLength } + if index < lenBytes && n.HasTtl() { + n.Ttl = LoadTTLFromBytes(bytes[index : index+TtlBytesLength]) + index = index + TtlBytesLength + } } func ReadNeedleHeader(r *os.File, version Version, offset int64) (n *Needle, bodyLength uint32, err error) { @@ -263,3 +278,9 @@ func (n *Needle) HasLastModifiedDate() bool { func (n *Needle) SetHasLastModifiedDate() { n.Flags = n.Flags | FlagHasLastModifiedDate } +func (n *Needle) HasTtl() bool { + return n.Flags&FlagHasTtl > 0 +} +func (n *Needle) SetHasTtl() { + n.Flags = n.Flags | FlagHasTtl +} diff --git a/go/storage/replica_placement.go b/go/storage/replica_placement.go index 696888cd8..c1aca52eb 100644 --- a/go/storage/replica_placement.go +++ b/go/storage/replica_placement.go @@ -5,10 +5,6 @@ import ( "fmt" ) -const ( - ReplicaPlacementCount = 9 -) - type ReplicaPlacement struct { SameRackCount int DiffRackCount int @@ -55,7 +51,3 @@ func (rp *ReplicaPlacement) String() string { func (rp *ReplicaPlacement) GetCopyCount() int { return rp.DiffDataCenterCount + rp.DiffRackCount + rp.SameRackCount + 1 } - -func (rp *ReplicaPlacement) GetReplicationLevelIndex() int { - return rp.DiffDataCenterCount*3 + rp.DiffRackCount*3 + rp.SameRackCount -} diff --git a/go/storage/store.go b/go/storage/store.go index a6a4f399e..ef38ade98 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -14,6 +14,10 @@ import ( "strings" ) +const ( + MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes +) + type DiskLocation struct { Directory string MaxVolumeCount int @@ -83,11 +87,15 @@ func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts } return } -func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string) error { +func (s *Store) AddVolume(volumeListString string, collection string, replicaPlacement string, ttlString string) error { rt, e := NewReplicaPlacementFromString(replicaPlacement) if e != nil { return e } + ttl, e := ReadTTL(ttlString) + if e != nil { + return e + } for _, range_string := range strings.Split(volumeListString, ",") { if strings.Index(range_string, "-") < 0 { id_string := range_string @@ -95,7 +103,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla if err != nil { return fmt.Errorf("Volume Id %s is not a valid unsigned integer!", id_string) } - e = s.addVolume(VolumeId(id), collection, rt) + e = s.addVolume(VolumeId(id), collection, rt, ttl) } else { pair := strings.Split(range_string, "-") start, start_err := strconv.ParseUint(pair[0], 10, 64) @@ -107,7 +115,7 @@ func (s *Store) AddVolume(volumeListString string, collection string, replicaPla return fmt.Errorf("Volume End Id %s is not a valid unsigned integer!", pair[1]) } for id := start; id <= end; id++ { - if err := s.addVolume(VolumeId(id), collection, rt); err != nil { + if err := s.addVolume(VolumeId(id), collection, rt, ttl); err != nil { e = err } } @@ -129,6 +137,14 @@ func (s *Store) DeleteCollection(collection string) (e error) { } return } +func (s *Store) DeleteVolume(volumes map[VolumeId]*Volume, v *Volume) (e error) { + e = v.Destroy() + if e != nil { + return + } + delete(volumes, v.Id) + return +} func (s *Store) findVolume(vid VolumeId) *Volume { for _, location := range s.Locations { if v, found := location.volumes[vid]; found { @@ -148,13 +164,14 @@ func (s *Store) findFreeLocation() (ret *DiskLocation) { } return ret } -func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement) error { +func (s *Store) addVolume(vid VolumeId, collection string, replicaPlacement *ReplicaPlacement, ttl *TTL) error { if s.findVolume(vid) != nil { return fmt.Errorf("Volume Id %d already exists!", vid) } if location := s.findFreeLocation(); location != nil { - glog.V(0).Infoln("In dir", location.Directory, "adds volume =", vid, ", collection =", collection, ", replicaPlacement =", replicaPlacement) - if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement); err == nil { + glog.V(0).Infof("In dir %s adds volume:%v collection:%s replicaPlacement:%v ttl:%v", + location.Directory, vid, collection, replicaPlacement, ttl) + if volume, err := NewVolume(location.Directory, collection, vid, replicaPlacement, ttl); err == nil { location.volumes[vid] = volume return nil } else { @@ -190,9 +207,9 @@ func (l *DiskLocation) loadExistingVolumes() { } if vid, err := NewVolumeId(base); err == nil { if l.volumes[vid] == nil { - if v, e := NewVolume(l.Directory, collection, vid, nil); e == nil { + if v, e := NewVolume(l.Directory, collection, vid, nil, nil); e == nil { l.volumes[vid] = v - glog.V(0).Infoln("data file", l.Directory+"/"+name, "replicaPlacement =", v.ReplicaPlacement, "version =", v.Version(), "size =", v.Size()) + glog.V(0).Infof("data file %s, replicaPlacement=%s v=%d size=%d ttl=%s", l.Directory+"/"+name, v.ReplicaPlacement, v.Version(), v.Size(), v.Ttl.String()) } } } @@ -240,21 +257,31 @@ func (s *Store) Join() (masterNode string, e error) { for _, location := range s.Locations { maxVolumeCount = maxVolumeCount + location.MaxVolumeCount for k, v := range location.volumes { - volumeMessage := &operation.VolumeInformationMessage{ - Id: proto.Uint32(uint32(k)), - Size: proto.Uint64(uint64(v.Size())), - Collection: proto.String(v.Collection), - FileCount: proto.Uint64(uint64(v.nm.FileCount())), - DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), - DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), - ReadOnly: proto.Bool(v.readOnly), - ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), - Version: proto.Uint32(uint32(v.Version())), - } - volumeMessages = append(volumeMessages, volumeMessage) if maxFileKey < v.nm.MaxFileKey() { maxFileKey = v.nm.MaxFileKey() } + if !v.expired(s.volumeSizeLimit) { + volumeMessage := &operation.VolumeInformationMessage{ + Id: proto.Uint32(uint32(k)), + Size: proto.Uint64(uint64(v.Size())), + Collection: proto.String(v.Collection), + FileCount: proto.Uint64(uint64(v.nm.FileCount())), + DeleteCount: proto.Uint64(uint64(v.nm.DeletedCount())), + DeletedByteCount: proto.Uint64(v.nm.DeletedSize()), + ReadOnly: proto.Bool(v.readOnly), + ReplicaPlacement: proto.Uint32(uint32(v.ReplicaPlacement.Byte())), + Version: proto.Uint32(uint32(v.Version())), + Ttl: proto.Uint32(v.Ttl.ToUint32()), + } + volumeMessages = append(volumeMessages, volumeMessage) + } else { + if v.exiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) { + s.DeleteVolume(location.volumes, v) + glog.V(0).Infoln("volume", v.Id, "is deleted.") + } else { + glog.V(0).Infoln("volume", v.Id, "is expired.") + } + } } } diff --git a/go/storage/volume.go b/go/storage/volume.go index dec560545..34ae7e386 100644 --- a/go/storage/volume.go +++ b/go/storage/volume.go @@ -22,12 +22,13 @@ type Volume struct { SuperBlock - accessLock sync.Mutex + accessLock sync.Mutex + lastModifiedTime uint64 //unix time in seconds } -func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement) (v *Volume, e error) { +func NewVolume(dirname string, collection string, id VolumeId, replicaPlacement *ReplicaPlacement, ttl *TTL) (v *Volume, e error) { v = &Volume{dir: dirname, Collection: collection, Id: id} - v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement} + v.SuperBlock = SuperBlock{ReplicaPlacement: replicaPlacement, Ttl: ttl} e = v.load(true, true) return } @@ -49,12 +50,13 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool) error { var e error fileName := v.FileName() - if exists, canRead, canWrite, _ := checkFile(fileName + ".dat"); exists { + if exists, canRead, canWrite, modifiedTime := checkFile(fileName + ".dat"); exists { if !canRead { return fmt.Errorf("cannot read Volume Data file %s.dat", fileName) } if canWrite { v.dataFile, e = os.OpenFile(fileName+".dat", os.O_RDWR|os.O_CREATE, 0644) + v.lastModifiedTime = uint64(modifiedTime.Unix()) } else { glog.V(0).Infoln("opening " + fileName + ".dat in READONLY mode") v.dataFile, e = os.Open(fileName + ".dat") @@ -192,6 +194,9 @@ func (v *Volume) write(n *Needle) (size uint32, err error) { glog.V(4).Infof("failed to save in needle map %d: %s", n.Id, err.Error()) } } + if v.lastModifiedTime < n.LastModified { + v.lastModifiedTime = n.LastModified + } return } @@ -221,8 +226,25 @@ func (v *Volume) delete(n *Needle) (uint32, error) { func (v *Volume) read(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { - return n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if !ok || nv.Offset == 0 { + return -1, errors.New("Not Found") + } + bytesRead, err := n.Read(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + if err != nil { + return bytesRead, err + } + if !n.HasTtl() { + return bytesRead, err + } + ttlMinutes := n.Ttl.Minutes() + if ttlMinutes == 0 { + return bytesRead, nil + } + if !n.HasLastModifiedDate() { + return bytesRead, nil + } + if uint64(time.Now().Unix()) < n.LastModified+uint64(ttlMinutes*60) { + return bytesRead, nil } return -1, errors.New("Not Found") } @@ -343,3 +365,43 @@ func (v *Volume) ensureConvertIdxToCdb(fileName string) (cdbCanRead bool) { } return true } + +// volume is expired if modified time + volume ttl < now +// except when volume is empty +// or when the volume does not have a ttl +// or when volumeSizeLimit is 0 when server just starts +func (v *Volume) expired(volumeSizeLimit uint64) bool { + if volumeSizeLimit == 0 { + //skip if we don't know size limit + return false + } + if v.ContentSize() == 0 { + return false + } + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + glog.V(0).Infof("now:%v lastModified:%v", time.Now().Unix(), v.lastModifiedTime) + livedMinutes := (time.Now().Unix() - int64(v.lastModifiedTime)) / 60 + glog.V(0).Infof("ttl:%v lived:%v", v.Ttl, livedMinutes) + if int64(v.Ttl.Minutes()) < livedMinutes { + return true + } + return false +} + +// wait either maxDelayMinutes or 10% of ttl minutes +func (v *Volume) exiredLongEnough(maxDelayMinutes uint32) bool { + if v.Ttl == nil || v.Ttl.Minutes() == 0 { + return false + } + removalDelay := v.Ttl.Minutes() / 10 + if removalDelay > maxDelayMinutes { + removalDelay = maxDelayMinutes + } + + if uint64(v.Ttl.Minutes()+removalDelay)*60+v.lastModifiedTime < uint64(time.Now().Unix()) { + return true + } + return false +} diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index 165af1a19..6a954f743 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -8,6 +8,7 @@ type VolumeInfo struct { Id VolumeId Size uint64 ReplicaPlacement *ReplicaPlacement + Ttl *TTL Collection string Version Version FileCount int @@ -32,5 +33,6 @@ func NewVolumeInfo(m *operation.VolumeInformationMessage) (vi VolumeInfo, err er return vi, e } vi.ReplicaPlacement = rp + vi.Ttl = LoadTTLFromUint32(*m.Ttl) return vi, nil } diff --git a/go/storage/volume_super_block.go b/go/storage/volume_super_block.go index 35030b93e..3fbef44d6 100644 --- a/go/storage/volume_super_block.go +++ b/go/storage/volume_super_block.go @@ -2,7 +2,6 @@ package storage import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/util" "fmt" "os" ) @@ -15,12 +14,13 @@ const ( * Super block currently has 8 bytes allocated for each volume. * Byte 0: version, 1 or 2 * Byte 1: Replica Placement strategy, 000, 001, 002, 010, etc -* Byte 2 and byte 3: Time to live in minutes +* Byte 2 and byte 3: Time to live. See TTL for definition +* Rest bytes: Reserved */ type SuperBlock struct { version Version ReplicaPlacement *ReplicaPlacement - Ttl uint16 + Ttl *TTL } func (s *SuperBlock) Version() Version { @@ -30,7 +30,7 @@ func (s *SuperBlock) Bytes() []byte { header := make([]byte, SuperBlockSize) header[0] = byte(s.version) header[1] = s.ReplicaPlacement.Byte() - util.Uint16toBytes(header[2:4], s.Ttl) + s.Ttl.ToBytes(header[2:4]) return header } @@ -70,6 +70,6 @@ func ParseSuperBlock(header []byte) (superBlock SuperBlock, err error) { if superBlock.ReplicaPlacement, err = NewReplicaPlacementFromByte(header[1]); err != nil { err = fmt.Errorf("cannot read replica type: %s", err.Error()) } - superBlock.Ttl = util.BytesToUint16(header[2:4]) + superBlock.Ttl = LoadTTLFromBytes(header[2:4]) return } diff --git a/go/storage/volume_super_block_test.go b/go/storage/volume_super_block_test.go index 19a1bb757..13db4b194 100644 --- a/go/storage/volume_super_block_test.go +++ b/go/storage/volume_super_block_test.go @@ -6,16 +6,17 @@ import ( func TestSuperBlockReadWrite(t *testing.T) { rp, _ := NewReplicaPlacementFromByte(byte(001)) + ttl, _ := ReadTTL("15d") s := &SuperBlock{ version: CurrentVersion, ReplicaPlacement: rp, - Ttl: uint16(35), + Ttl: ttl, } bytes := s.Bytes() - if !(bytes[2] == 0 && bytes[3] == 35) { - println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) + if !(bytes[2] == 15 && bytes[3] == Day) { + println("byte[2]:", bytes[2], "byte[3]:", bytes[3]) t.Fail() } diff --git a/go/storage/volume_ttl.go b/go/storage/volume_ttl.go new file mode 100644 index 000000000..5ff43e24e --- /dev/null +++ b/go/storage/volume_ttl.go @@ -0,0 +1,147 @@ +package storage + +import ( + "strconv" +) + +var ( + TtlRange []uint16 +) + +const ( + //stored unit types + Empty byte = iota + Minute + Hour + Day + Week + Month + Year +) + +func init() { + TtlRange = []uint16{3, 10, 30, + 60 /*1 hour*/, 60 * 3, 60 * 6, 60 * 12, + 1440 /*1 day*/, 1440 * 3, 1440 * 7, 1440 * 15, 1440 * 31, + 44888 /*1 month*/, 65535, + } +} + +type TTL struct { + count byte + unit byte +} + +var EMPTY_TTL = &TTL{} + +// translate a readable ttl to internal ttl +// Supports format example: +// 3m: 3 minutes +// 4h: 4 hours +// 5d: 5 days +// 6w: 6 weeks +// 7M: 7 months +// 8y: 8 years +func ReadTTL(ttlString string) (*TTL, error) { + if ttlString == "" { + return EMPTY_TTL, nil + } + ttlBytes := []byte(ttlString) + unitByte := ttlBytes[len(ttlBytes)-1] + countBytes := ttlBytes[0 : len(ttlBytes)-1] + if '0' <= unitByte && unitByte <= '9' { + countBytes = ttlBytes + unitByte = 'm' + } + count, err := strconv.Atoi(string(countBytes)) + unit := toStoredByte(unitByte) + return &TTL{count: byte(count), unit: unit}, err +} + +// read stored bytes to a ttl +func LoadTTLFromBytes(input []byte) (t *TTL) { + return &TTL{count: input[0], unit: input[1]} +} + +// read stored bytes to a ttl +func LoadTTLFromUint32(ttl uint32) (t *TTL) { + input := make([]byte, 2) + input[1] = byte(ttl) + input[0] = byte(ttl >> 8) + return LoadTTLFromBytes(input) +} + +// save stored bytes to an output with 2 bytes +func (t TTL) ToBytes(output []byte) { + output[0] = t.count + output[1] = t.unit +} + +func (t TTL) ToUint32() (output uint32) { + output = uint32(t.count) << 8 + output += uint32(t.unit) + return output +} + +func (t TTL) String() string { + if t.count == 0 { + return "" + } + if t.unit == Empty { + return "" + } + countString := strconv.Itoa(int(t.count)) + switch t.unit { + case Minute: + return countString + "m" + case Hour: + return countString + "h" + case Day: + return countString + "d" + case Week: + return countString + "w" + case Month: + return countString + "M" + case Year: + return countString + "y" + } + return "" +} + +func toStoredByte(readableUnitByte byte) byte { + switch readableUnitByte { + case 'm': + return Minute + case 'h': + return Hour + case 'd': + return Day + case 'w': + return Week + case 'M': + return Month + case 'Y': + return Year + } + return 0 +} + +func (t TTL) Minutes() uint32 { + switch t.unit { + case Empty: + return 0 + case Minute: + return uint32(t.count) + case Hour: + return uint32(t.count) * 60 + case Day: + return uint32(t.count) * 60 * 24 + case Week: + return uint32(t.count) * 60 * 24 * 7 + case Month: + return uint32(t.count) * 60 * 24 * 31 + case Year: + return uint32(t.count) * 60 * 24 * 31 * 365 + } + return 0 +} diff --git a/go/storage/volume_vacuum.go b/go/storage/volume_vacuum.go index 7d2a38cb8..706a1f951 100644 --- a/go/storage/volume_vacuum.go +++ b/go/storage/volume_vacuum.go @@ -4,7 +4,7 @@ import ( "code.google.com/p/weed-fs/go/glog" "fmt" "os" - _ "time" + "time" ) func (v *Volume) garbageLevel() float64 { @@ -13,9 +13,10 @@ func (v *Volume) garbageLevel() float64 { func (v *Volume) Compact() error { glog.V(3).Infof("Compacting ...") - v.accessLock.Lock() - defer v.accessLock.Unlock() - glog.V(3).Infof("Got Compaction lock...") + //no need to lock for copy on write + //v.accessLock.Lock() + //defer v.accessLock.Unlock() + //glog.V(3).Infof("Got Compaction lock...") filePath := v.FileName() glog.V(3).Infof("creating copies for volume %d ...", v.Id) @@ -59,10 +60,15 @@ func (v *Volume) copyDataAndGenerateIndexFile(dstName, idxName string) (err erro nm := NewNeedleMap(idx) new_offset := int64(SuperBlockSize) + now := uint64(time.Now().Unix()) + err = ScanVolumeFile(v.dir, v.Collection, v.Id, func(superBlock SuperBlock) error { _, err = dst.Write(superBlock.Bytes()) return err }, true, func(n *Needle, offset int64) error { + if n.HasTtl() && now >= n.LastModified+uint64(v.Ttl.Minutes()*60) { + return nil + } nv, ok := v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 { |
