aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--k8s/seaweedfs/Chart.yaml4
-rw-r--r--k8s/seaweedfs/values.yaml2
-rw-r--r--weed/filer/filerstore_wrapper.go2
-rw-r--r--weed/filer/reader_at.go6
-rw-r--r--weed/filesys/dir.go2
-rw-r--r--weed/filesys/dirty_pages_temp_file.go27
-rw-r--r--weed/filesys/dirty_pages_temp_interval.go48
-rw-r--r--weed/filesys/filehandle.go8
-rw-r--r--weed/topology/node.go4
-rw-r--r--weed/topology/topology.go8
-rw-r--r--weed/topology/topology_event_handling.go4
-rw-r--r--weed/topology/volume_layout.go6
-rw-r--r--weed/util/constants.go2
15 files changed, 80 insertions, 47 deletions
diff --git a/go.mod b/go.mod
index 6b2af25d7..b0aba4c94 100644
--- a/go.mod
+++ b/go.mod
@@ -15,7 +15,7 @@ require (
github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72
github.com/bwmarrin/snowflake v0.3.0
github.com/cespare/xxhash v1.1.0
- github.com/chrislusf/raft v1.0.6
+ github.com/chrislusf/raft v1.0.7
github.com/coreos/go-semver v0.3.0 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/disintegration/imaging v1.6.2
diff --git a/go.sum b/go.sum
index f2cc6d26d..b02192d9c 100644
--- a/go.sum
+++ b/go.sum
@@ -159,6 +159,8 @@ github.com/chrislusf/raft v1.0.5 h1:g8GxKCSStfm0/bGBDpNEbmEXL6MJkpXX+NI0ksbX5D4=
github.com/chrislusf/raft v1.0.5/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chrislusf/raft v1.0.6 h1:wunb85WWhMKhNRn7EmdIw35D4Lmew0ZJv8oYDizR/+Y=
github.com/chrislusf/raft v1.0.6/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
+github.com/chrislusf/raft v1.0.7 h1:reybAIwnQOTSgTj1YgflbJFWLSN0KVQSxe8gDZYa04o=
+github.com/chrislusf/raft v1.0.7/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml
index 91ae0a251..b27e50f57 100644
--- a/k8s/seaweedfs/Chart.yaml
+++ b/k8s/seaweedfs/Chart.yaml
@@ -1,5 +1,5 @@
apiVersion: v1
description: SeaweedFS
name: seaweedfs
-appVersion: "2.45"
-version: 2.45
+appVersion: "2.47"
+version: 2.47
diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml
index 82dbc60ee..8c2c11a6b 100644
--- a/k8s/seaweedfs/values.yaml
+++ b/k8s/seaweedfs/values.yaml
@@ -4,7 +4,7 @@ global:
registry: ""
repository: ""
imageName: chrislusf/seaweedfs
- # imageTag: "2.45" - started using {.Chart.appVersion}
+ # imageTag: "2.47" - started using {.Chart.appVersion}
imagePullPolicy: IfNotPresent
imagePullSecrets: imagepullsecret
restartPolicy: Always
diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go
index cd7c0bea3..5175a87a1 100644
--- a/weed/filer/filerstore_wrapper.go
+++ b/weed/filer/filerstore_wrapper.go
@@ -150,7 +150,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) (
}()
entry, err = actualStore.FindEntry(ctx, fp)
- glog.V(4).Infof("FindEntry %s: %v", fp, err)
+ // glog.V(4).Infof("FindEntry %s: %v", fp, err)
if err != nil {
return nil, err
}
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index b03b3bbb4..458cf88be 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -106,7 +106,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
- glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
+ // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@@ -137,7 +137,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
if chunkStart >= chunkStop {
continue
}
- glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
+ // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size))
var buffer []byte
bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset
bufferLength := chunkStop - chunkStart
@@ -152,7 +152,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
startOffset, remaining = startOffset+int64(copied), remaining-int64(copied)
}
- glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
+ // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err)
if err == nil && remaining > 0 && c.fileSize > startOffset {
delta := int(min(remaining, c.fileSize-startOffset))
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 09d5fd449..72e41247f 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -296,7 +296,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err
func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) {
dirPath := util.FullPath(dir.FullPath())
- glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
+ // glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String())
fullFilePath := dirPath.Child(req.Name)
visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath)
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index a04efb6aa..58c150acf 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -56,18 +56,23 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
}
pages.tf = tf
pages.writtenIntervals.tempFile = tf
+ pages.writtenIntervals.lastOffset = 0
}
- writtenOffset := pages.writtenIntervals.TotalSize()
+ writtenOffset := pages.writtenIntervals.lastOffset
+ dataSize := int64(len(data))
- glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+int64(len(data)))
+ // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil {
pages.lastErr = err
} else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
+ pages.writtenIntervals.lastOffset += dataSize
}
+ // pages.writtenIntervals.debug()
+
return
}
@@ -81,6 +86,11 @@ func (pages *TempFileDirtyPages) FlushData() error {
pages.pageAddLock.Lock()
defer pages.pageAddLock.Unlock()
if pages.tf != nil {
+
+ pages.writtenIntervals.tempFile = nil
+ pages.writtenIntervals.lists = nil
+
+ pages.tf.Close()
os.Remove(pages.tf.Name())
pages.tf = nil
}
@@ -91,15 +101,16 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
pageSize := pages.f.wfs.option.ChunkSizeLimit
- uploadedSize := int64(0)
+ // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
+
for _, list := range pages.writtenIntervals.lists {
- for {
- start, stop := max(list.Offset(), uploadedSize), min(list.Offset()+list.Size(), uploadedSize+pageSize)
+ listStopOffset := list.Offset() + list.Size()
+ for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
+ start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
if start >= stop {
- break
+ continue
}
- uploadedSize = stop
- glog.V(4).Infof("uploading %v [%d,%d)", pages.f.Name, start, stop)
+ // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists))
pages.saveToStorage(list.ToReader(start, stop), start, stop-start)
}
}
diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go
index f423b0e85..2d22845e2 100644
--- a/weed/filesys/dirty_pages_temp_interval.go
+++ b/weed/filesys/dirty_pages_temp_interval.go
@@ -1,8 +1,8 @@
package filesys
import (
- "github.com/chrislusf/seaweedfs/weed/glog"
"io"
+ "log"
"os"
)
@@ -20,8 +20,9 @@ type WrittenIntervalLinkedList struct {
}
type WrittenContinuousIntervals struct {
- tempFile *os.File
- lists []*WrittenIntervalLinkedList
+ tempFile *os.File
+ lastOffset int64
+ lists []*WrittenIntervalLinkedList
}
func (list *WrittenIntervalLinkedList) Offset() int64 {
@@ -95,6 +96,21 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv
}
}
+func (c *WrittenContinuousIntervals) debug() {
+ log.Printf("++")
+ for _, l := range c.lists {
+ log.Printf("++++")
+ for t := l.Head; ; t = t.Next {
+ log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
+ if t.Next == nil {
+ break
+ }
+ }
+ log.Printf("----")
+ }
+ log.Printf("--")
+}
+
func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) {
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
@@ -223,6 +239,7 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
for t := l.Head; ; t = t.Next {
startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop)
if startOffset < stopOffset {
+ // log.Printf("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset))
}
if t.Next == nil {
@@ -236,29 +253,32 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader
}
type FileSectionReader struct {
- file *os.File
- Offset int64
- dataStart int64
- dataStop int64
+ file *os.File
+ tempStartOffset int64
+ Offset int64
+ dataStart int64
+ dataStop int64
}
var _ = io.Reader(&FileSectionReader{})
func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader {
return &FileSectionReader{
- file: tempfile,
- Offset: offset,
- dataStart: dataOffset,
- dataStop: dataOffset + size,
+ file: tempfile,
+ tempStartOffset: offset,
+ Offset: offset,
+ dataStart: dataOffset,
+ dataStop: dataOffset + size,
}
}
func (f *FileSectionReader) Read(p []byte) (n int, err error) {
- dataLen := min(f.dataStop-f.Offset, int64(len(p)))
- if dataLen < 0 {
+ remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset)
+ if remaining <= 0 {
return 0, io.EOF
}
- glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen)
+ dataLen := min(remaining, int64(len(p)))
+ // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart)
n, err = f.file.ReadAt(p[:dataLen], f.Offset)
if n > 0 {
f.Offset += int64(n)
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 1245cce71..88cfe45f0 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -38,8 +38,8 @@ type FileHandle struct {
func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
fh := &FileHandle{
f: file,
- dirtyPages: newContinuousDirtyPages(file, writeOnly),
- /// dirtyPages: newTempFileDirtyPages(file, writeOnly),
+ // dirtyPages: newContinuousDirtyPages(file, writeOnly),
+ dirtyPages: newTempFileDirtyPages(file, writeOnly),
Uid: uid,
Gid: gid,
}
@@ -149,7 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) {
glog.Errorf("file handle read %s: %v", fileFullPath, err)
}
- glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
+ // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err)
return int64(totalRead), err
}
@@ -175,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
entry.Content = nil
entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize)))
- glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
+ // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
fh.dirtyPages.AddPage(req.Offset, data)
diff --git a/weed/topology/node.go b/weed/topology/node.go
index a23729dd3..4556b4165 100644
--- a/weed/topology/node.go
+++ b/weed/topology/node.go
@@ -242,9 +242,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi
for _, v := range dn.GetVolumes() {
if v.Size >= volumeSizeLimit {
//fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
- n.GetTopology().chanFullVolumes <- &v
+ n.GetTopology().chanFullVolumes <- v
}else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold {
- n.GetTopology().chanCrowdedVolumes <- &v
+ n.GetTopology().chanCrowdedVolumes <- v
}
}
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index 3932e3fbb..d704a5636 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -34,8 +34,8 @@ type Topology struct {
Sequence sequence.Sequencer
- chanFullVolumes chan *storage.VolumeInfo
- chanCrowdedVolumes chan *storage.VolumeInfo
+ chanFullVolumes chan storage.VolumeInfo
+ chanCrowdedVolumes chan storage.VolumeInfo
Configuration *Configuration
@@ -57,8 +57,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls
t.Sequence = seq
- t.chanFullVolumes = make(chan *storage.VolumeInfo)
- t.chanCrowdedVolumes = make(chan *storage.VolumeInfo)
+ t.chanFullVolumes = make(chan storage.VolumeInfo)
+ t.chanCrowdedVolumes = make(chan storage.VolumeInfo)
t.Configuration = &Configuration{}
diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go
index 2f4fba932..0f1db74df 100644
--- a/weed/topology/topology_event_handling.go
+++ b/weed/topology/topology_event_handling.go
@@ -39,7 +39,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g
}
}()
}
-func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
+func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool {
diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
if !vl.SetVolumeCapacityFull(volumeInfo.Id) {
@@ -68,7 +68,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool {
return true
}
-func (t *Topology) SetVolumeCrowded(volumeInfo *storage.VolumeInfo) {
+func (t *Topology) SetVolumeCrowded(volumeInfo storage.VolumeInfo) {
diskType := types.ToDiskType(volumeInfo.DiskType)
vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType)
vl.SetVolumeCrowded(volumeInfo.Id)
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index 57e511fa0..f315cb7e4 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -108,7 +108,7 @@ type VolumeLayout struct {
diskType types.DiskType
vid2location map[needle.VolumeId]*VolumeLocationList
writables []needle.VolumeId // transient array of writable volume id
- crowded map[needle.VolumeId]interface{}
+ crowded map[needle.VolumeId]struct{}
readonlyVolumes *volumesBinaryState // readonly volumes
oversizedVolumes *volumesBinaryState // oversized volumes
volumeSizeLimit uint64
@@ -129,7 +129,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType
diskType: diskType,
vid2location: make(map[needle.VolumeId]*VolumeLocationList),
writables: *new([]needle.VolumeId),
- crowded: make(map[needle.VolumeId]interface{}),
+ crowded: make(map[needle.VolumeId]struct{}),
readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()),
oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()),
volumeSizeLimit: volumeSizeLimit,
@@ -421,7 +421,7 @@ func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) {
func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) {
if _, ok := vl.crowded[vid]; !ok {
- vl.crowded[vid] = nil
+ vl.crowded[vid] = struct{}{}
glog.V(0).Infoln("Volume", vid, "becomes crowded")
}
}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 270cedb6f..9985b111c 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 45)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 47)
COMMIT = ""
)