diff options
28 files changed, 410 insertions, 408 deletions
diff --git a/.travis.yml b/.travis.yml index 612f643e9..b42847e8e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,9 +29,12 @@ deploy: - build/linux_arm64.tar.gz - build/linux_386.tar.gz - build/linux_amd64.tar.gz + - build/linux_amd64_large_disk.tar.gz - build/darwin_amd64.tar.gz + - build/darwin_amd64_large_disk.tar.gz - build/windows_386.zip - build/windows_amd64.zip + - build/windows_amd64_large_disk.zip - build/freebsd_arm.tar.gz - build/freebsd_amd64.tar.gz - build/freebsd_386.tar.gz @@ -12,6 +12,9 @@ build = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -ldflags "-extldflags -stat tar = cd build && tar -cvzf $(1)_$(2).tar.gz $(appname)$(3) && rm $(appname)$(3) zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3) +build_large = CGO_ENABLED=0 GOOS=$(1) GOARCH=$(2) go build -tags 5BytesOffset -ldflags "-extldflags -static" -o build/$(appname)$(3) $(SOURCE_DIR) +tar_large = cd build && tar -cvzf $(1)_$(2)_large_disk.tar.gz $(appname)$(3) && rm $(appname)$(3) +zip_large = cd build && zip $(1)_$(2)_large_disk.zip $(appname)$(3) && rm $(appname)$(3) all: build @@ -32,9 +35,21 @@ linux: deps mkdir -p linux GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR) -release: deps windows_build darwin_build linux_build bsd_build +release: deps windows_build darwin_build linux_build bsd_build 5_byte_linux_build 5_byte_darwin_build 5_byte_windows_build ##### LINUX BUILDS ##### +5_byte_linux_build: + $(call build_large,linux,amd64,) + $(call tar_large,linux,amd64) + +5_byte_darwin_build: + $(call build_large,darwin,amd64,) + $(call tar_large,darwin,amd64) + +5_byte_windows_build: + $(call build_large,windows,amd64,.exe) + $(call zip_large,windows,amd64,.exe) + linux_build: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz build/linux_386.tar.gz: $(sources) diff --git a/weed/command/export.go b/weed/command/export.go index cdced5936..47abc2929 100644 --- a/weed/command/export.go +++ b/weed/command/export.go @@ -107,7 +107,7 @@ func (scanner *VolumeFileScanner4Export) VisitNeedle(n *storage.Needle, offset i nv, ok := needleMap.Get(n.Id) glog.V(3).Infof("key %d offset %d size %d disk_size %d gzip %v ok %v nv %+v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped(), ok, nv) - if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && int64(nv.Offset)*types.NeedlePaddingSize == offset { + if ok && nv.Size > 0 && nv.Size != types.TombstoneFileSize && nv.Offset.ToAcutalOffset() == offset { if newerThanUnix >= 0 && n.HasLastModifiedDate() && n.LastModified < uint64(newerThanUnix) { glog.V(3).Infof("Skipping this file, as it's old enough: LastModified %d vs %d", n.LastModified, newerThanUnix) diff --git a/weed/command/fix.go b/weed/command/fix.go index 42ae23a3c..2536d774f 100644 --- a/weed/command/fix.go +++ b/weed/command/fix.go @@ -45,11 +45,11 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *storage.Needle, offset int64) error { glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped()) if n.Size > 0 && n.Size != types.TombstoneFileSize { - pe := scanner.nm.Put(n.Id, types.Offset(offset/types.NeedlePaddingSize), n.Size) + pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size) glog.V(2).Infof("saved %d with error %v", n.Size, pe) } else { glog.V(2).Infof("skipping deleted file ...") - return scanner.nm.Delete(n.Id, types.Offset(offset/types.NeedlePaddingSize)) + return scanner.nm.Delete(n.Id, types.ToOffset(offset)) } return nil } diff --git a/weed/command/master.go b/weed/command/master.go index 15d1171e0..cd5704c3f 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -70,7 +70,7 @@ func runMaster(cmd *Command, args []string) bool { if *masterWhiteListOption != "" { masterWhiteList = strings.Split(*masterWhiteListOption, ",") } - if *volumeSizeLimitMB > 30*1000 { + if *volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("volumeSizeLimitMB should be smaller than 30000") } diff --git a/weed/command/server.go b/weed/command/server.go index d88ded0ee..a56944b48 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -90,7 +90,7 @@ func init() { serverOptions.v.port = cmdServer.Flag.Int("volume.port", 8080, "volume server http listen port") serverOptions.v.publicPort = cmdServer.Flag.Int("volume.port.public", 0, "volume server public port") - serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") + serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") serverOptions.v.publicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") @@ -140,7 +140,7 @@ func runServer(cmd *Command, args []string) bool { folders := strings.Split(*volumeDataFolders, ",") - if *masterVolumeSizeLimitMB > 30*1000 { + if *masterVolumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { glog.Fatalf("masterVolumeSizeLimitMB should be less than 30000") } diff --git a/weed/command/volume.go b/weed/command/volume.go index 2ee6bb11a..b87555456 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -58,7 +58,7 @@ func init() { v.maxCpu = cmdVolume.Flag.Int("maxCpu", 0, "maximum number of CPUs. 0 means all available CPUs") v.dataCenter = cmdVolume.Flag.String("dataCenter", "", "current volume server's data center name") v.rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") - v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb|btree] mode for memory~performance balance.") + v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") @@ -142,10 +142,10 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v switch *v.indexType { case "leveldb": volumeNeedleMapKind = storage.NeedleMapLevelDb - case "boltdb": - volumeNeedleMapKind = storage.NeedleMapBoltDb - case "btree": - volumeNeedleMapKind = storage.NeedleMapBtree + case "leveldbMedium": + volumeNeedleMapKind = storage.NeedleMapLevelDbMedium + case "leveldbLarge": + volumeNeedleMapKind = storage.NeedleMapLevelDbLarge } masters := *v.masters diff --git a/weed/server/volume_grpc_follow.go b/weed/server/volume_grpc_follow.go index cc5dcc78e..c3ce774c0 100644 --- a/weed/server/volume_grpc_follow.go +++ b/weed/server/volume_grpc_follow.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "os" @@ -28,7 +27,7 @@ func (vs *VolumeServer) VolumeFollow(req *volume_server_pb.VolumeFollowRequest, return nil } - startOffset := int64(foundOffset) * int64(types.NeedlePaddingSize) + startOffset := foundOffset.ToAcutalOffset() buf := make([]byte, 1024*1024*2) return sendFileContent(v.DataFile(), buf, startOffset, stopOffset, stream) diff --git a/weed/storage/needle/btree_map.go b/weed/storage/needle/btree_map.go index d688b802e..aed940f0e 100644 --- a/weed/storage/needle/btree_map.go +++ b/weed/storage/needle/btree_map.go @@ -26,7 +26,7 @@ func (cm *BtreeMap) Set(key NeedleId, offset Offset, size uint32) (oldOffset Off } func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) { - found := cm.tree.Delete(NeedleValue{key, 0, 0}) + found := cm.tree.Delete(NeedleValue{key, Offset{}, 0}) if found != nil { old := found.(NeedleValue) return old.Size @@ -34,7 +34,7 @@ func (cm *BtreeMap) Delete(key NeedleId) (oldSize uint32) { return } func (cm *BtreeMap) Get(key NeedleId) (*NeedleValue, bool) { - found := cm.tree.Get(NeedleValue{key, 0, 0}) + found := cm.tree.Get(NeedleValue{key, Offset{}, 0}) if found != nil { old := found.(NeedleValue) return &old, true diff --git a/weed/storage/needle/compact_map.go b/weed/storage/needle/compact_map.go index bca698407..cb0bf2e51 100644 --- a/weed/storage/needle/compact_map.go +++ b/weed/storage/needle/compact_map.go @@ -15,27 +15,36 @@ type SectionalNeedleId uint32 const SectionalNeedleIdLimit = 1<<32 - 1 type SectionalNeedleValue struct { - Key SectionalNeedleId - Offset Offset `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G - Size uint32 `comment:"Size of the data portion"` + Key SectionalNeedleId + OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G + Size uint32 `comment:"Size of the data portion"` +} + +type SectionalNeedleValueExtra struct { + OffsetHigher OffsetHigher } type CompactSection struct { sync.RWMutex - values []SectionalNeedleValue - overflow Overflow - start NeedleId - end NeedleId - counter int + values []SectionalNeedleValue + valuesExtra []SectionalNeedleValueExtra + overflow Overflow + overflowExtra OverflowExtra + start NeedleId + end NeedleId + counter int } type Overflow []SectionalNeedleValue +type OverflowExtra []SectionalNeedleValueExtra func NewCompactSection(start NeedleId) *CompactSection { return &CompactSection{ - values: make([]SectionalNeedleValue, batch), - overflow: Overflow(make([]SectionalNeedleValue, 0)), - start: start, + values: make([]SectionalNeedleValue, batch), + valuesExtra: make([]SectionalNeedleValueExtra, batch), + overflow: Overflow(make([]SectionalNeedleValue, 0)), + overflowExtra: OverflowExtra(make([]SectionalNeedleValueExtra, 0)), + start: start, } } @@ -47,21 +56,21 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs } skey := SectionalNeedleId(key - cs.start) if i := cs.binarySearchValues(skey); i >= 0 { - oldOffset, oldSize = cs.values[i].Offset, cs.values[i].Size + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size //println("key", key, "old size", ret) - cs.values[i].Offset, cs.values[i].Size = offset, size + cs.valuesExtra[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size } else { needOverflow := cs.counter >= batch needOverflow = needOverflow || cs.counter > 0 && cs.values[cs.counter-1].Key > skey if needOverflow { //println("start", cs.start, "counter", cs.counter, "key", key) - if oldValue, found := cs.overflow.findOverflowEntry(skey); found { - oldOffset, oldSize = oldValue.Offset, oldValue.Size + if oldValueExtra, oldValue, found := cs.findOverflowEntry(skey); found { + oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValueExtra.OffsetHigher, oldValue.OffsetLower, oldValue.Size } - cs.overflow = cs.overflow.setOverflowEntry(SectionalNeedleValue{Key: skey, Offset: offset, Size: size}) + cs.setOverflowEntry(skey, offset, size) } else { p := &cs.values[cs.counter] - p.Key, p.Offset, p.Size = skey, offset, size + p.Key, cs.valuesExtra[cs.counter].OffsetHigher, p.OffsetLower, p.Size = skey, offset.OffsetHigher, offset.OffsetLower, size //println("added index", cs.counter, "key", key, cs.values[cs.counter].Key) cs.counter++ } @@ -70,6 +79,50 @@ func (cs *CompactSection) Set(key NeedleId, offset Offset, size uint32) (oldOffs return } +func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size uint32) { + needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size} + needleValueExtra := SectionalNeedleValueExtra{OffsetHigher: OffsetHigher{}} + insertCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= needleValue.Key + }) + if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key { + cs.overflow[insertCandidate] = needleValue + } else { + cs.overflow = append(cs.overflow, needleValue) + cs.overflowExtra = append(cs.overflowExtra, needleValueExtra) + for i := len(cs.overflow) - 1; i > insertCandidate; i-- { + cs.overflow[i] = cs.overflow[i-1] + cs.overflowExtra[i] = cs.overflowExtra[i-1] + } + cs.overflow[insertCandidate] = needleValue + } +} + +func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nve SectionalNeedleValueExtra, nv SectionalNeedleValue, found bool) { + foundCandidate := sort.Search(len(cs.overflow), func(i int) bool { + return cs.overflow[i].Key >= key + }) + if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key { + return cs.overflowExtra[foundCandidate], cs.overflow[foundCandidate], true + } + return nve, nv, false +} + +func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) { + length := len(cs.overflow) + deleteCandidate := sort.Search(length, func(i int) bool { + return cs.overflow[i].Key >= key + }) + if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key { + for i := deleteCandidate; i < length-1; i++ { + cs.overflow[i] = cs.overflow[i+1] + cs.overflowExtra[i] = cs.overflowExtra[i+1] + } + cs.overflow = cs.overflow[0 : length-1] + cs.overflowExtra = cs.overflowExtra[0 : length-1] + } +} + //return old entry size func (cs *CompactSection) Delete(key NeedleId) uint32 { skey := SectionalNeedleId(key - cs.start) @@ -81,8 +134,8 @@ func (cs *CompactSection) Delete(key NeedleId) uint32 { cs.values[i].Size = TombstoneFileSize } } - if v, found := cs.overflow.findOverflowEntry(skey); found { - cs.overflow = cs.overflow.deleteOverflowEntry(skey) + if _, v, found := cs.findOverflowEntry(skey); found { + cs.deleteOverflowEntry(skey) ret = v.Size } cs.Unlock() @@ -91,14 +144,14 @@ func (cs *CompactSection) Delete(key NeedleId) uint32 { func (cs *CompactSection) Get(key NeedleId) (*NeedleValue, bool) { cs.RLock() skey := SectionalNeedleId(key - cs.start) - if v, ok := cs.overflow.findOverflowEntry(skey); ok { + if ve, v, ok := cs.findOverflowEntry(skey); ok { cs.RUnlock() - nv := v.toNeedleValue(cs) + nv := toNeedleValue(ve, v, cs) return &nv, true } if i := cs.binarySearchValues(skey); i >= 0 { cs.RUnlock() - nv := cs.values[i].toNeedleValue(cs) + nv := toNeedleValue(cs.valuesExtra[i], cs.values[i], cs) return &nv, true } cs.RUnlock() @@ -194,8 +247,8 @@ func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int { func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { for _, cs := range cm.list { cs.RLock() - for _, v := range cs.overflow { - if err := visit(v.toNeedleValue(cs)); err != nil { + for i, v := range cs.overflow { + if err := visit(toNeedleValue(cs.overflowExtra[i], v, cs)); err != nil { cs.RUnlock() return err } @@ -204,8 +257,8 @@ func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { if i >= cs.counter { break } - if _, found := cs.overflow.findOverflowEntry(v.Key); !found { - if err := visit(v.toNeedleValue(cs)); err != nil { + if _, _, found := cs.findOverflowEntry(v.Key); !found { + if err := visit(toNeedleValue(cs.valuesExtra[i], v, cs)); err != nil { cs.RUnlock() return err } @@ -216,50 +269,20 @@ func (cm *CompactMap) Visit(visit func(NeedleValue) error) error { return nil } -func (o Overflow) deleteOverflowEntry(key SectionalNeedleId) Overflow { - length := len(o) - deleteCandidate := sort.Search(length, func(i int) bool { - return o[i].Key >= key - }) - if deleteCandidate != length && o[deleteCandidate].Key == key { - for i := deleteCandidate; i < length-1; i++ { - o[i] = o[i+1] - } - o = o[0 : length-1] +func toNeedleValue(snve SectionalNeedleValueExtra, snv SectionalNeedleValue, cs *CompactSection) NeedleValue { + offset := Offset{ + OffsetHigher: snve.OffsetHigher, + OffsetLower: snv.OffsetLower, } - return o + return NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size} } -func (o Overflow) setOverflowEntry(needleValue SectionalNeedleValue) Overflow { - insertCandidate := sort.Search(len(o), func(i int) bool { - return o[i].Key >= needleValue.Key - }) - if insertCandidate != len(o) && o[insertCandidate].Key == needleValue.Key { - o[insertCandidate] = needleValue - } else { - o = append(o, needleValue) - for i := len(o) - 1; i > insertCandidate; i-- { - o[i] = o[i-1] - } - o[insertCandidate] = needleValue +func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) (SectionalNeedleValue, SectionalNeedleValueExtra) { + return SectionalNeedleValue{ + SectionalNeedleId(nv.Key - cs.start), + nv.Offset.OffsetLower, + nv.Size, + }, SectionalNeedleValueExtra{ + nv.Offset.OffsetHigher, } - return o -} - -func (o Overflow) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) { - foundCandidate := sort.Search(len(o), func(i int) bool { - return o[i].Key >= key - }) - if foundCandidate != len(o) && o[foundCandidate].Key == key { - return o[foundCandidate], true - } - return nv, false -} - -func (snv SectionalNeedleValue) toNeedleValue(cs *CompactSection) NeedleValue { - return NeedleValue{NeedleId(snv.Key) + cs.start, snv.Offset, snv.Size} -} - -func (nv NeedleValue) toSectionalNeedleValue(cs *CompactSection) SectionalNeedleValue { - return SectionalNeedleValue{SectionalNeedleId(nv.Key - cs.start), nv.Offset, nv.Size} } diff --git a/weed/storage/needle/compact_map_perf_test.go b/weed/storage/needle/compact_map_perf_test.go index 908da968f..3f6fe548b 100644 --- a/weed/storage/needle/compact_map_perf_test.go +++ b/weed/storage/needle/compact_map_perf_test.go @@ -62,7 +62,7 @@ func loadNewNeedleMap(file *os.File) (*CompactMap, uint64) { offset := BytesToOffset(bytes[i+NeedleIdSize : i+NeedleIdSize+OffsetSize]) size := util.BytesToUint32(bytes[i+NeedleIdSize+OffsetSize : i+NeedleIdSize+OffsetSize+SizeSize]) - if offset > 0 { + if !offset.IsZero() { m.Set(NeedleId(key), offset, size) } else { m.Delete(key) diff --git a/weed/storage/needle/compact_map_test.go b/weed/storage/needle/compact_map_test.go index 73231053e..b9586ab54 100644 --- a/weed/storage/needle/compact_map_test.go +++ b/weed/storage/needle/compact_map_test.go @@ -1,22 +1,23 @@ package needle import ( + "fmt" . "github.com/chrislusf/seaweedfs/weed/storage/types" "testing" ) func TestOverflow2(t *testing.T) { m := NewCompactMap() - m.Set(NeedleId(150088), 8, 3000073) - m.Set(NeedleId(150073), 8, 3000073) - m.Set(NeedleId(150089), 8, 3000073) - m.Set(NeedleId(150076), 8, 3000073) - m.Set(NeedleId(150124), 8, 3000073) - m.Set(NeedleId(150137), 8, 3000073) - m.Set(NeedleId(150147), 8, 3000073) - m.Set(NeedleId(150145), 8, 3000073) - m.Set(NeedleId(150158), 8, 3000073) - m.Set(NeedleId(150162), 8, 3000073) + m.Set(NeedleId(150088), ToOffset(8), 3000073) + m.Set(NeedleId(150073), ToOffset(8), 3000073) + m.Set(NeedleId(150089), ToOffset(8), 3000073) + m.Set(NeedleId(150076), ToOffset(8), 3000073) + m.Set(NeedleId(150124), ToOffset(8), 3000073) + m.Set(NeedleId(150137), ToOffset(8), 3000073) + m.Set(NeedleId(150147), ToOffset(8), 3000073) + m.Set(NeedleId(150145), ToOffset(8), 3000073) + m.Set(NeedleId(150158), ToOffset(8), 3000073) + m.Set(NeedleId(150162), ToOffset(8), 3000073) m.Visit(func(value NeedleValue) error { println("needle key:", value.Key) @@ -26,13 +27,13 @@ func TestOverflow2(t *testing.T) { func TestIssue52(t *testing.T) { m := NewCompactMap() - m.Set(NeedleId(10002), 10002, 10002) + m.Set(NeedleId(10002), ToOffset(10002), 10002) if element, ok := m.Get(NeedleId(10002)); ok { - println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) } - m.Set(NeedleId(10001), 10001, 10001) + m.Set(NeedleId(10001), ToOffset(10001), 10001) if element, ok := m.Get(NeedleId(10002)); ok { - println("key", 10002, "ok", ok, element.Key, element.Offset, element.Size) + fmt.Printf("key %d ok %v %d, %v, %d\n", 10002, ok, element.Key, element.Offset, element.Size) } else { t.Fatal("key 10002 missing after setting 10001") } @@ -41,7 +42,7 @@ func TestIssue52(t *testing.T) { func TestCompactMap(t *testing.T) { m := NewCompactMap() for i := uint32(0); i < 100*batch; i += 2 { - m.Set(NeedleId(i), Offset(i), i) + m.Set(NeedleId(i), ToOffset(int64(i)), i) } for i := uint32(0); i < 100*batch; i += 37 { @@ -49,7 +50,7 @@ func TestCompactMap(t *testing.T) { } for i := uint32(0); i < 10*batch; i += 3 { - m.Set(NeedleId(i), Offset(i+11), i+5) + m.Set(NeedleId(i), ToOffset(int64(i+11)), i+5) } // for i := uint32(0); i < 100; i++ { @@ -97,60 +98,60 @@ func TestCompactMap(t *testing.T) { } func TestOverflow(t *testing.T) { - o := Overflow(make([]SectionalNeedleValue, 0)) + cs := NewCompactSection(1) - o = o.setOverflowEntry(SectionalNeedleValue{Key: 1, Offset: 12, Size: 12}) - o = o.setOverflowEntry(SectionalNeedleValue{Key: 2, Offset: 12, Size: 12}) - o = o.setOverflowEntry(SectionalNeedleValue{Key: 3, Offset: 12, Size: 12}) - o = o.setOverflowEntry(SectionalNeedleValue{Key: 4, Offset: 12, Size: 12}) - o = o.setOverflowEntry(SectionalNeedleValue{Key: 5, Offset: 12, Size: 12}) + cs.setOverflowEntry(1, ToOffset(12), 12) + cs.setOverflowEntry(2, ToOffset(12), 12) + cs.setOverflowEntry(3, ToOffset(12), 12) + cs.setOverflowEntry(4, ToOffset(12), 12) + cs.setOverflowEntry(5, ToOffset(12), 12) - if o[2].Key != 3 { - t.Fatalf("expecting o[2] has key 3: %+v", o[2].Key) + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) } - o = o.setOverflowEntry(SectionalNeedleValue{Key: 3, Offset: 24, Size: 24}) + cs.setOverflowEntry(3, ToOffset(24), 24) - if o[2].Key != 3 { - t.Fatalf("expecting o[2] has key 3: %+v", o[2].Key) + if cs.overflow[2].Key != 3 { + t.Fatalf("expecting o[2] has key 3: %+v", cs.overflow[2].Key) } - if o[2].Size != 24 { - t.Fatalf("expecting o[2] has size 24: %+v", o[2].Size) + if cs.overflow[2].Size != 24 { + t.Fatalf("expecting o[2] has size 24: %+v", cs.overflow[2].Size) } - o = o.deleteOverflowEntry(4) + cs.deleteOverflowEntry(4) - if len(o) != 4 { - t.Fatalf("expecting 4 entries now: %+v", o) + if len(cs.overflow) != 4 { + t.Fatalf("expecting 4 entries now: %+v", cs.overflow) } - x, _ := o.findOverflowEntry(5) + _, x, _ := cs.findOverflowEntry(5) if x.Key != 5 { t.Fatalf("expecting entry 5 now: %+v", x) } - for i, x := range o { + for i, x := range cs.overflow { println("overflow[", i, "]:", x.Key) } println() - o = o.deleteOverflowEntry(1) + cs.deleteOverflowEntry(1) - for i, x := range o { + for i, x := range cs.overflow { println("overflow[", i, "]:", x.Key) } println() - o = o.setOverflowEntry(SectionalNeedleValue{Key: 4, Offset: 44, Size: 44}) - for i, x := range o { + cs.setOverflowEntry(4, ToOffset(44), 44) + for i, x := range cs.overflow { println("overflow[", i, "]:", x.Key) } println() - o = o.setOverflowEntry(SectionalNeedleValue{Key: 1, Offset: 11, Size: 11}) + cs.setOverflowEntry(1, ToOffset(11), 11) - for i, x := range o { + for i, x := range cs.overflow { println("overflow[", i, "]:", x.Key) } println() diff --git a/weed/storage/needle_map.go b/weed/storage/needle_map.go index 6d815679b..92fc06aae 100644 --- a/weed/storage/needle_map.go +++ b/weed/storage/needle_map.go @@ -14,10 +14,10 @@ import ( type NeedleMapType int const ( - NeedleMapInMemory NeedleMapType = iota - NeedleMapLevelDb - NeedleMapBoltDb - NeedleMapBtree + NeedleMapInMemory NeedleMapType = iota + NeedleMapLevelDb // small memory footprint, 4MB total, 1 write buffer, 3 block buffer + NeedleMapLevelDbMedium // medium memory footprint, 8MB total, 3 write buffer, 5 block buffer + NeedleMapLevelDbLarge // large memory footprint, 12MB total, 4write buffer, 8 block buffer ) type NeedleMapper interface { diff --git a/weed/storage/needle_map_boltdb.go b/weed/storage/needle_map_boltdb.go deleted file mode 100644 index a24c55a32..000000000 --- a/weed/storage/needle_map_boltdb.go +++ /dev/null @@ -1,185 +0,0 @@ -package storage - -import ( - "fmt" - "os" - - "github.com/boltdb/bolt" - - "errors" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - . "github.com/chrislusf/seaweedfs/weed/storage/types" - "github.com/chrislusf/seaweedfs/weed/util" -) - -type BoltDbNeedleMap struct { - dbFileName string - db *bolt.DB - baseNeedleMapper -} - -var boltdbBucket = []byte("weed") - -var NotFound = errors.New("not found") - -func NewBoltDbNeedleMap(dbFileName string, indexFile *os.File) (m *BoltDbNeedleMap, err error) { - m = &BoltDbNeedleMap{dbFileName: dbFileName} - m.indexFile = indexFile - if !isBoltDbFresh(dbFileName, indexFile) { - glog.V(0).Infof("Start to Generate %s from %s", dbFileName, indexFile.Name()) - generateBoltDbFile(dbFileName, indexFile) - glog.V(0).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) - } - glog.V(1).Infof("Opening %s...", dbFileName) - if m.db, err = bolt.Open(dbFileName, 0644, nil); err != nil { - return - } - glog.V(1).Infof("Loading %s...", indexFile.Name()) - mm, indexLoadError := newNeedleMapMetricFromIndexFile(indexFile) - if indexLoadError != nil { - return nil, indexLoadError - } - m.mapMetric = *mm - return -} - -func isBoltDbFresh(dbFileName string, indexFile *os.File) bool { - // normally we always write to index file first - dbLogFile, err := os.Open(dbFileName) - if err != nil { - return false - } - defer dbLogFile.Close() - dbStat, dbStatErr := dbLogFile.Stat() - indexStat, indexStatErr := indexFile.Stat() - if dbStatErr != nil || indexStatErr != nil { - glog.V(0).Infof("Can not stat file: %v and %v", dbStatErr, indexStatErr) - return false - } - - return dbStat.ModTime().After(indexStat.ModTime()) -} - -func generateBoltDbFile(dbFileName string, indexFile *os.File) error { - db, err := bolt.Open(dbFileName, 0644, nil) - if err != nil { - return err - } - defer db.Close() - return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset > 0 && size != TombstoneFileSize { - boltDbWrite(db, key, offset, size) - } else { - boltDbDelete(db, key) - } - return nil - }) -} - -func (m *BoltDbNeedleMap) Get(key NeedleId) (element *needle.NeedleValue, ok bool) { - var offset Offset - var size uint32 - bytes := make([]byte, NeedleIdSize) - NeedleIdToBytes(bytes, key) - err := m.db.View(func(tx *bolt.Tx) error { - bucket := tx.Bucket(boltdbBucket) - if bucket == nil { - return fmt.Errorf("Bucket %q not found!", boltdbBucket) - } - - data := bucket.Get(bytes) - - if len(data) == 0 { - return NotFound - } - - if len(data) != OffsetSize+SizeSize { - glog.V(0).Infof("key:%v has wrong data length: %d", key, len(data)) - return fmt.Errorf("key:%v has wrong data length: %d", key, len(data)) - } - - offset = BytesToOffset(data[0:OffsetSize]) - size = util.BytesToUint32(data[OffsetSize : OffsetSize+SizeSize]) - - return nil - }) - - if err != nil { - return nil, false - } - return &needle.NeedleValue{Key: key, Offset: offset, Size: size}, true -} - -func (m *BoltDbNeedleMap) Put(key NeedleId, offset Offset, size uint32) error { - var oldSize uint32 - if oldNeedle, ok := m.Get(key); ok { - oldSize = oldNeedle.Size - } - m.logPut(key, oldSize, size) - // write to index file first - if err := m.appendToIndexFile(key, offset, size); err != nil { - return fmt.Errorf("cannot write to indexfile %s: %v", m.indexFile.Name(), err) - } - return boltDbWrite(m.db, key, offset, size) -} - -func boltDbWrite(db *bolt.DB, - key NeedleId, offset Offset, size uint32) error { - - bytes := make([]byte, NeedleIdSize+OffsetSize+SizeSize) - NeedleIdToBytes(bytes[0:NeedleIdSize], key) - OffsetToBytes(bytes[NeedleIdSize:NeedleIdSize+OffsetSize], offset) - util.Uint32toBytes(bytes[NeedleIdSize+OffsetSize:NeedleIdSize+OffsetSize+SizeSize], size) - - return db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) - if err != nil { - return err - } - - err = bucket.Put(bytes[0:NeedleIdSize], bytes[NeedleIdSize:NeedleIdSize+OffsetSize+SizeSize]) - if err != nil { - return err - } - return nil - }) -} -func boltDbDelete(db *bolt.DB, key NeedleId) error { - bytes := make([]byte, NeedleIdSize) - NeedleIdToBytes(bytes, key) - return db.Update(func(tx *bolt.Tx) error { - bucket, err := tx.CreateBucketIfNotExists(boltdbBucket) - if err != nil { - return err - } - - err = bucket.Delete(bytes) - if err != nil { - return err - } - return nil - }) -} - -func (m *BoltDbNeedleMap) Delete(key NeedleId, offset Offset) error { - if oldNeedle, ok := m.Get(key); ok { - m.logDelete(oldNeedle.Size) - } - // write to index file first - if err := m.appendToIndexFile(key, offset, TombstoneFileSize); err != nil { - return err - } - return boltDbDelete(m.db, key) -} - -func (m *BoltDbNeedleMap) Close() { - m.indexFile.Close() - m.db.Close() -} - -func (m *BoltDbNeedleMap) Destroy() error { - m.Close() - os.Remove(m.indexFile.Name()) - return os.Remove(m.dbFileName) -} diff --git a/weed/storage/needle_map_leveldb.go b/weed/storage/needle_map_leveldb.go index 77d29bd87..4d5280938 100644 --- a/weed/storage/needle_map_leveldb.go +++ b/weed/storage/needle_map_leveldb.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/syndtr/goleveldb/leveldb/opt" "os" "path/filepath" @@ -18,7 +19,7 @@ type LevelDbNeedleMap struct { baseNeedleMapper } -func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedleMap, err error) { +func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File, opts *opt.Options) (m *LevelDbNeedleMap, err error) { m = &LevelDbNeedleMap{dbFileName: dbFileName} m.indexFile = indexFile if !isLevelDbFresh(dbFileName, indexFile) { @@ -27,7 +28,8 @@ func NewLevelDbNeedleMap(dbFileName string, indexFile *os.File) (m *LevelDbNeedl glog.V(0).Infof("Finished Generating %s from %s", dbFileName, indexFile.Name()) } glog.V(1).Infof("Opening %s...", dbFileName) - if m.db, err = leveldb.OpenFile(dbFileName, nil); err != nil { + + if m.db, err = leveldb.OpenFile(dbFileName, opts); err != nil { return } glog.V(1).Infof("Loading %s...", indexFile.Name()) @@ -63,7 +65,7 @@ func generateLevelDbFile(dbFileName string, indexFile *os.File) error { } defer db.Close() return WalkIndexFile(indexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset > 0 && size != TombstoneFileSize { + if !offset.IsZero() && size != TombstoneFileSize { levelDbWrite(db, key, offset, size) } else { levelDbDelete(db, key) diff --git a/weed/storage/needle_map_memory.go b/weed/storage/needle_map_memory.go index fa5576c2b..ad3bd3f7a 100644 --- a/weed/storage/needle_map_memory.go +++ b/weed/storage/needle_map_memory.go @@ -50,12 +50,12 @@ func doLoading(file *os.File, nm *NeedleMap) (*NeedleMap, error) { if key > nm.MaximumFileKey { nm.MaximumFileKey = key } - if offset > 0 && size != TombstoneFileSize { + if !offset.IsZero() && size != TombstoneFileSize { nm.FileCounter++ nm.FileByteCounter = nm.FileByteCounter + uint64(size) oldOffset, oldSize := nm.m.Set(NeedleId(key), offset, size) // glog.V(3).Infoln("reading key", key, "offset", offset*NeedlePaddingSize, "size", size, "oldSize", oldSize) - if oldOffset > 0 && oldSize != TombstoneFileSize { + if !oldOffset.IsZero() && oldSize != TombstoneFileSize { nm.DeletionCounter++ nm.DeletionByteCounter = nm.DeletionByteCounter + uint64(oldSize) } diff --git a/weed/storage/types/needle_types.go b/weed/storage/types/needle_types.go index ce4e601e4..b591dd7c3 100644 --- a/weed/storage/types/needle_types.go +++ b/weed/storage/types/needle_types.go @@ -7,16 +7,25 @@ import ( "strconv" ) -type Offset uint32 +type Offset struct { + OffsetHigher + OffsetLower +} + +type OffsetLower struct { + b3 byte + b2 byte + b1 byte + b0 byte // the smaller byte +} + type Cookie uint32 const ( - OffsetSize = 4 SizeSize = 4 // uint32 size - NeedleEntrySize = NeedleIdSize + OffsetSize + SizeSize + NeedleEntrySize = CookieSize + NeedleIdSize + SizeSize TimestampSize = 8 // int64 size NeedlePaddingSize = 8 - MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 TombstoneFileSize = math.MaxUint32 CookieSize = 4 ) @@ -39,15 +48,3 @@ func ParseCookie(cookieString string) (Cookie, error) { } return Cookie(cookie), nil } - -func OffsetToBytes(bytes []byte, offset Offset) { - util.Uint32toBytes(bytes, uint32(offset)) -} - -func Uint32ToOffset(offset uint32) Offset { - return Offset(offset) -} - -func BytesToOffset(bytes []byte) Offset { - return Offset(util.BytesToUint32(bytes[0:4])) -} diff --git a/weed/storage/types/offset_4bytes.go b/weed/storage/types/offset_4bytes.go new file mode 100644 index 000000000..9acd069d3 --- /dev/null +++ b/weed/storage/types/offset_4bytes.go @@ -0,0 +1,63 @@ +// +build !5BytesOffset + +package types + +import ( + "fmt" +) + +type OffsetHigher struct { + // b4 byte +} + +const ( + OffsetSize = 4 + MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 // 32GB +) + +func OffsetToBytes(bytes []byte, offset Offset) { + bytes[3] = offset.b0 + bytes[2] = offset.b1 + bytes[1] = offset.b2 + bytes[0] = offset.b3 +} + +// only for testing, will be removed later. +func Uint32ToOffset(offset uint32) Offset { + return Offset{ + OffsetLower: OffsetLower{ + b0: byte(offset), + b1: byte(offset >> 8), + b2: byte(offset >> 16), + b3: byte(offset >> 24), + }, + } +} + +func BytesToOffset(bytes []byte) Offset { + return Offset{ + OffsetLower: OffsetLower{ + b0: bytes[3], + b1: bytes[2], + b2: bytes[1], + b3: bytes[0], + }, + } +} + +func (offset Offset) IsZero() bool { + return offset.b0 == 0 && offset.b1 == 0 && offset.b2 == 0 && offset.b3 == 0 +} + +func ToOffset(offset int64) Offset { + smaller := uint32(offset / int64(NeedlePaddingSize)) + return Uint32ToOffset(smaller) +} + +func (offset Offset) ToAcutalOffset() (actualOffset int64) { + return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24) * int64(NeedlePaddingSize) +} + +func (offset Offset) String() string { + return fmt.Sprintf("%d", int64(offset.b0)+int64(offset.b1)<<8+int64(offset.b2)<<16+int64(offset.b3)<<24) +} diff --git a/weed/storage/types/offset_5bytes.go b/weed/storage/types/offset_5bytes.go new file mode 100644 index 000000000..f57e4f6d4 --- /dev/null +++ b/weed/storage/types/offset_5bytes.go @@ -0,0 +1,80 @@ +// +build 5BytesOffset + +package types + +import ( + "fmt" +) + +type OffsetHigher struct { + b4 byte +} + +const ( + OffsetSize = 4 + 1 + MaxPossibleVolumeSize = 4 * 1024 * 1024 * 1024 * 8 * 256 /* 256 is from the extra byte */ // 8TB +) + +func OffsetToBytes(bytes []byte, offset Offset) { + bytes[4] = offset.b4 + bytes[3] = offset.b0 + bytes[2] = offset.b1 + bytes[1] = offset.b2 + bytes[0] = offset.b3 +} + +// only for testing, will be removed later. +func Uint32ToOffset(offset uint32) Offset { + return Offset{ + OffsetHigher: OffsetHigher{ + b4: byte(offset >> 32), + }, + OffsetLower: OffsetLower{ + b0: byte(offset), + b1: byte(offset >> 8), + b2: byte(offset >> 16), + b3: byte(offset >> 24), + }, + } +} + +func BytesToOffset(bytes []byte) Offset { + return Offset{ + OffsetHigher: OffsetHigher{ + b4: bytes[4], + }, + OffsetLower: OffsetLower{ + b0: bytes[3], + b1: bytes[2], + b2: bytes[1], + b3: bytes[0], + }, + } +} + +func (offset Offset) IsZero() bool { + return offset.b0 == 0 && offset.b1 == 0 && offset.b2 == 0 && offset.b3 == 0 && offset.b4 == 0 +} + +func ToOffset(offset int64) Offset { + smaller := offset / int64(NeedlePaddingSize) + return Offset{ + OffsetHigher: OffsetHigher{ + b4: byte(smaller >> 32), + }, + OffsetLower: OffsetLower{ + b0: byte(smaller), + b1: byte(smaller >> 8), + b2: byte(smaller >> 16), + b3: byte(smaller >> 24), + }, + } +} + +func (offset Offset) ToAcutalOffset() (actualOffset int64) { + return (int64(offset.b0) + int64(offset.b1)<<8 + int64(offset.b2)<<16 + int64(offset.b3)<<24 + int64(offset.b4)<<32) * int64(NeedlePaddingSize) +} + +func (offset Offset) String() string { + return fmt.Sprintf("%d", int64(offset.b0)+int64(offset.b1)<<8+int64(offset.b2)<<16+int64(offset.b3)<<24+int64(offset.b4)<<32) +} diff --git a/weed/storage/volume_checking.go b/weed/storage/volume_checking.go index 12c282be9..1ac73d3d3 100644 --- a/weed/storage/volume_checking.go +++ b/weed/storage/volume_checking.go @@ -26,10 +26,10 @@ func CheckVolumeDataIntegrity(v *Volume, indexFile *os.File) error { return fmt.Errorf("readLastIndexEntry %s failed: %v", indexFile.Name(), e) } key, offset, size := IdxFileEntry(lastIdxEntry) - if offset == 0 || size == TombstoneFileSize { + if offset.IsZero() || size == TombstoneFileSize { return nil } - if e = verifyNeedleIntegrity(v.dataFile, v.Version(), int64(offset)*NeedlePaddingSize, key, size); e != nil { + if e = verifyNeedleIntegrity(v.dataFile, v.Version(), offset.ToAcutalOffset(), key, size); e != nil { return fmt.Errorf("verifyNeedleIntegrity %s failed: %v", indexFile.Name(), e) } diff --git a/weed/storage/volume_follow.go b/weed/storage/volume_follow.go index b8353f9d1..8512ec932 100644 --- a/weed/storage/volume_follow.go +++ b/weed/storage/volume_follow.go @@ -110,7 +110,7 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) { if err != nil { return 0, err } - if offset == 0 { + if offset.IsZero() { return 0, nil } return v.readAppendAtNs(offset) @@ -119,26 +119,26 @@ func (v *Volume) findLastAppendAtNs() (uint64, error) { func (v *Volume) locateLastAppendEntry() (Offset, error) { indexFile, e := os.OpenFile(v.FileName()+".idx", os.O_RDONLY, 0644) if e != nil { - return 0, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e) + return Offset{}, fmt.Errorf("cannot read %s.idx: %v", v.FileName(), e) } defer indexFile.Close() fi, err := indexFile.Stat() if err != nil { - return 0, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err) + return Offset{}, fmt.Errorf("file %s stat error: %v", indexFile.Name(), err) } fileSize := fi.Size() if fileSize%NeedleEntrySize != 0 { - return 0, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) + return Offset{}, fmt.Errorf("unexpected file %s size: %d", indexFile.Name(), fileSize) } if fileSize == 0 { - return 0, nil + return Offset{}, nil } bytes := make([]byte, NeedleEntrySize) n, e := indexFile.ReadAt(bytes, fileSize-NeedleEntrySize) if n != NeedleEntrySize { - return 0, fmt.Errorf("file %s read error: %v", indexFile.Name(), e) + return Offset{}, fmt.Errorf("file %s read error: %v", indexFile.Name(), e) } _, offset, _ := IdxFileEntry(bytes) @@ -147,13 +147,13 @@ func (v *Volume) locateLastAppendEntry() (Offset, error) { func (v *Volume) readAppendAtNs(offset Offset) (uint64, error) { - n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize) + n, bodyLength, err := ReadNeedleHeader(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()) if err != nil { return 0, fmt.Errorf("ReadNeedleHeader: %v", err) } - err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, int64(offset)*NeedlePaddingSize+int64(NeedleEntrySize), bodyLength) + err = n.ReadNeedleBody(v.dataFile, v.SuperBlock.version, offset.ToAcutalOffset()+int64(NeedleEntrySize), bodyLength) if err != nil { - return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", int64(offset)*NeedlePaddingSize, bodyLength, err) + return 0, fmt.Errorf("ReadNeedleBody offset %d, bodyLength %d: %v", offset.ToAcutalOffset(), bodyLength, err) } return n.AppendAtNs, nil @@ -189,7 +189,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast m := (l + h) / 2 if m == entryCount { - return 0, true, nil + return Offset{}, true, nil } // read the appendAtNs for entry m @@ -214,7 +214,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast } if l == entryCount { - return 0, true, nil + return Offset{}, true, nil } offset, err = v.readAppendAtNsForIndexEntry(indexFile, bytes, l) @@ -226,7 +226,7 @@ func (v *Volume) BinarySearchByAppendAtNs(sinceNs uint64) (offset Offset, isLast // bytes is of size NeedleEntrySize func (v *Volume) readAppendAtNsForIndexEntry(indexFile *os.File, bytes []byte, m int64) (Offset, error) { if _, readErr := indexFile.ReadAt(bytes, m*NeedleEntrySize); readErr != nil && readErr != io.EOF { - return 0, readErr + return Offset{}, readErr } _, offset, _ := IdxFileEntry(bytes) return offset, nil @@ -247,7 +247,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool { func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *Needle, offset int64) error { if n.Size > 0 && n.Size != TombstoneFileSize { - return scanner.v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size) + return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size) } - return scanner.v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)) + return scanner.v.nm.Delete(n.Id, ToOffset(offset)) } diff --git a/weed/storage/volume_loading.go b/weed/storage/volume_loading.go index 37a6e07b2..14013b302 100644 --- a/weed/storage/volume_loading.go +++ b/weed/storage/volume_loading.go @@ -2,6 +2,7 @@ package storage import ( "fmt" + "github.com/syndtr/goleveldb/leveldb/opt" "os" "time" @@ -82,18 +83,30 @@ func (v *Volume) load(alsoLoadIndex bool, createDatIfMissing bool, needleMapKind } case NeedleMapLevelDb: glog.V(0).Infoln("loading leveldb", fileName+".ldb") - if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile); e != nil { + opts := &opt.Options{ + BlockCacheCapacity: 2 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 1 * 1024 * 1024, // default value is 4MiB + } + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } - case NeedleMapBoltDb: - glog.V(0).Infoln("loading boltdb", fileName+".bdb") - if v.nm, e = NewBoltDbNeedleMap(fileName+".bdb", indexFile); e != nil { - glog.V(0).Infof("loading boltdb %s error: %v", fileName+".bdb", e) + case NeedleMapLevelDbMedium: + glog.V(0).Infoln("loading leveldb medium", fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 4 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 2 * 1024 * 1024, // default value is 4MiB + } + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } - case NeedleMapBtree: - glog.V(0).Infoln("loading index", fileName+".idx", "to btree readonly", v.readOnly) - if v.nm, e = LoadBtreeNeedleMap(indexFile); e != nil { - glog.V(0).Infof("loading index %s to btree error: %v", fileName+".idx", e) + case NeedleMapLevelDbLarge: + glog.V(0).Infoln("loading leveldb large", fileName+".ldb") + opts := &opt.Options{ + BlockCacheCapacity: 8 * 1024 * 1024, // default value is 8MiB + WriteBuffer: 4 * 1024 * 1024, // default value is 4MiB + } + if v.nm, e = NewLevelDbNeedleMap(fileName+".ldb", indexFile, opts); e != nil { + glog.V(0).Infof("loading leveldb %s error: %v", fileName+".ldb", e) } } } diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go index 5366a547d..93f4ed1c1 100644 --- a/weed/storage/volume_read_write.go +++ b/weed/storage/volume_read_write.go @@ -21,9 +21,9 @@ func (v *Volume) isFileUnchanged(n *Needle) bool { return false } nv, ok := v.nm.Get(n.Id) - if ok && nv.Offset > 0 { + if ok && !nv.Offset.IsZero() { oldNeedle := new(Needle) - err := oldNeedle.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := oldNeedle.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { glog.V(0).Infof("Failed to check updated file %v", err) return false @@ -96,8 +96,8 @@ func (v *Volume) writeNeedle(n *Needle) (offset uint64, size uint32, err error) } nv, ok := v.nm.Get(n.Id) - if !ok || uint64(nv.Offset)*NeedlePaddingSize < offset { - if err = v.nm.Put(n.Id, Offset(offset/NeedlePaddingSize), n.Size); err != nil { + if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset { + if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil { glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err) } } @@ -124,7 +124,7 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { if err != nil { return size, err } - if err = v.nm.Delete(n.Id, Offset(offset/NeedlePaddingSize)); err != nil { + if err = v.nm.Delete(n.Id, ToOffset(int64(offset))); err != nil { return size, err } return size, err @@ -135,10 +135,10 @@ func (v *Volume) deleteNeedle(n *Needle) (uint32, error) { // read fills in Needle content by looking up n.Id from NeedleMapper func (v *Volume) readNeedle(n *Needle) (int, error) { nv, ok := v.nm.Get(n.Id) - if !ok || nv.Offset == 0 { + if !ok || nv.Offset.IsZero() { v.compactingWg.Wait() nv, ok = v.nm.Get(n.Id) - if !ok || nv.Offset == 0 { + if !ok || nv.Offset.IsZero() { return -1, ErrorNotFound } } @@ -148,7 +148,7 @@ func (v *Volume) readNeedle(n *Needle) (int, error) { if nv.Size == 0 { return 0, nil } - err := n.ReadData(v.dataFile, int64(nv.Offset)*NeedlePaddingSize, nv.Size, v.Version()) + err := n.ReadData(v.dataFile, nv.Offset.ToAcutalOffset(), nv.Size, v.Version()) if err != nil { return 0, err } diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index b575277cd..b550edb80 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -201,13 +201,13 @@ func (v *Volume) makeupDiff(newDatFileName, newIdxFileName, oldDatFileName, oldI } //updated needle - if increIdxEntry.offset != 0 && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize { + if !increIdxEntry.offset.IsZero() && increIdxEntry.size != 0 && increIdxEntry.size != TombstoneFileSize { //even the needle cache in memory is hit, the need_bytes is correct - glog.V(4).Infof("file %d offset %d size %d", key, int64(increIdxEntry.offset)*NeedlePaddingSize, increIdxEntry.size) + glog.V(4).Infof("file %d offset %d size %d", key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size) var needleBytes []byte - needleBytes, err = ReadNeedleBlob(oldDatFile, int64(increIdxEntry.offset)*NeedlePaddingSize, increIdxEntry.size, v.Version()) + needleBytes, err = ReadNeedleBlob(oldDatFile, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, v.Version()) if err != nil { - return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, int64(increIdxEntry.offset)*NeedlePaddingSize, increIdxEntry.size, err) + return fmt.Errorf("ReadNeedleBlob %s key %d offset %d size %d failed: %v", oldDatFile.Name(), key, increIdxEntry.offset.ToAcutalOffset(), increIdxEntry.size, err) } dst.Write(needleBytes) util.Uint32toBytes(idxEntryBytes[8:12], uint32(offset/NeedlePaddingSize)) @@ -261,8 +261,8 @@ func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *Needle, offset int64) er } nv, ok := scanner.v.nm.Get(n.Id) glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) - if ok && int64(nv.Offset)*NeedlePaddingSize == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { - if err := scanner.nm.Put(n.Id, Offset(scanner.newOffset/NeedlePaddingSize), n.Size); err != nil { + if ok && nv.Offset.ToAcutalOffset() == offset && nv.Size > 0 && nv.Size != TombstoneFileSize { + if err := scanner.nm.Put(n.Id, ToOffset(scanner.newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, _, err := n.Append(scanner.dst, scanner.v.Version()); err != nil { @@ -325,7 +325,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { newOffset := int64(v.SuperBlock.BlockSize()) WalkIndexFile(oldIndexFile, func(key NeedleId, offset Offset, size uint32) error { - if offset == 0 || size == TombstoneFileSize { + if offset.IsZero() || size == TombstoneFileSize { return nil } @@ -335,7 +335,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { } n := new(Needle) - err := n.ReadData(v.dataFile, int64(offset)*NeedlePaddingSize, size, v.Version()) + err := n.ReadData(v.dataFile, offset.ToAcutalOffset(), size, v.Version()) if err != nil { return nil } @@ -346,7 +346,7 @@ func (v *Volume) copyDataBasedOnIndexFile(dstName, idxName string) (err error) { glog.V(4).Infoln("needle expected offset ", offset, "ok", ok, "nv", nv) if nv.Offset == offset && nv.Size > 0 { - if err = nm.Put(n.Id, Offset(newOffset/NeedlePaddingSize), n.Size); err != nil { + if err = nm.Put(n.Id, ToOffset(newOffset), n.Size); err != nil { return fmt.Errorf("cannot put needle: %s", err) } if _, _, _, err = n.Append(dst, v.Version()); err != nil { diff --git a/weed/tools/read_index.go b/weed/tools/read_index.go deleted file mode 100644 index d53f489ea..000000000 --- a/weed/tools/read_index.go +++ /dev/null @@ -1,29 +0,0 @@ -package main - -import ( - "flag" - "fmt" - "log" - "os" - - "github.com/chrislusf/seaweedfs/weed/storage" - "github.com/chrislusf/seaweedfs/weed/storage/types" -) - -var ( - indexFileName = flag.String("file", "", ".idx file to analyze") -) - -func main() { - flag.Parse() - indexFile, err := os.OpenFile(*indexFileName, os.O_RDONLY, 0644) - if err != nil { - log.Fatalf("Create Volume Index [ERROR] %s\n", err) - } - defer indexFile.Close() - - storage.WalkIndexFile(indexFile, func(key types.NeedleId, offset types.Offset, size uint32) error { - fmt.Printf("key %d, offset %d, size %d, nextOffset %d\n", key, offset*8, size, int64(offset)*types.NeedlePaddingSize+int64(size)) - return nil - }) -} diff --git a/weed/util/constants.go b/weed/util/constants.go index 0a4980e6f..aa8399523 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -1,5 +1,9 @@ package util -const ( - VERSION = "1.28" +import ( + "fmt" +) + +var ( + VERSION = fmt.Sprintf("%s %d.%d", sizeLimit, 1, 29) ) diff --git a/weed/util/constants_4bytes.go b/weed/util/constants_4bytes.go new file mode 100644 index 000000000..a29d9d3b0 --- /dev/null +++ b/weed/util/constants_4bytes.go @@ -0,0 +1,8 @@ +// +build !5BytesOffset + +package util + +const ( + sizeLimit = "30GB" + VolumeSizeLimitGB = 30 +) diff --git a/weed/util/constants_5bytes.go b/weed/util/constants_5bytes.go new file mode 100644 index 000000000..91ce4066f --- /dev/null +++ b/weed/util/constants_5bytes.go @@ -0,0 +1,8 @@ +// +build 5BytesOffset + +package util + +const ( + sizeLimit = "8000GB" + VolumeSizeLimitGB = 8000 +) |
