diff options
| -rw-r--r-- | go.mod | 2 | ||||
| -rw-r--r-- | go.sum | 2 | ||||
| -rw-r--r-- | k8s/seaweedfs/Chart.yaml | 4 | ||||
| -rw-r--r-- | k8s/seaweedfs/values.yaml | 2 | ||||
| -rw-r--r-- | weed/filer/filerstore_wrapper.go | 2 | ||||
| -rw-r--r-- | weed/filer/reader_at.go | 6 | ||||
| -rw-r--r-- | weed/filesys/dir.go | 2 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_file.go | 27 | ||||
| -rw-r--r-- | weed/filesys/dirty_pages_temp_interval.go | 48 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 8 | ||||
| -rw-r--r-- | weed/topology/node.go | 4 | ||||
| -rw-r--r-- | weed/topology/topology.go | 8 | ||||
| -rw-r--r-- | weed/topology/topology_event_handling.go | 4 | ||||
| -rw-r--r-- | weed/topology/volume_layout.go | 6 | ||||
| -rw-r--r-- | weed/util/constants.go | 2 |
15 files changed, 80 insertions, 47 deletions
@@ -15,7 +15,7 @@ require ( github.com/buraksezer/consistent v0.0.0-20191006190839-693edf70fd72 github.com/bwmarrin/snowflake v0.3.0 github.com/cespare/xxhash v1.1.0 - github.com/chrislusf/raft v1.0.6 + github.com/chrislusf/raft v1.0.7 github.com/coreos/go-semver v0.3.0 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/disintegration/imaging v1.6.2 @@ -159,6 +159,8 @@ github.com/chrislusf/raft v1.0.5 h1:g8GxKCSStfm0/bGBDpNEbmEXL6MJkpXX+NI0ksbX5D4= github.com/chrislusf/raft v1.0.5/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/chrislusf/raft v1.0.6 h1:wunb85WWhMKhNRn7EmdIw35D4Lmew0ZJv8oYDizR/+Y= github.com/chrislusf/raft v1.0.6/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= +github.com/chrislusf/raft v1.0.7 h1:reybAIwnQOTSgTj1YgflbJFWLSN0KVQSxe8gDZYa04o= +github.com/chrislusf/raft v1.0.7/go.mod h1:Ep5DP+mJSosjfKiix1uU7Lc2Df/SX4oGJEpZlXH5l68= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 91ae0a251..b27e50f57 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -appVersion: "2.45" -version: 2.45 +appVersion: "2.47" +version: 2.47 diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 82dbc60ee..8c2c11a6b 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - # imageTag: "2.45" - started using {.Chart.appVersion} + # imageTag: "2.47" - started using {.Chart.appVersion} imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/filer/filerstore_wrapper.go b/weed/filer/filerstore_wrapper.go index cd7c0bea3..5175a87a1 100644 --- a/weed/filer/filerstore_wrapper.go +++ b/weed/filer/filerstore_wrapper.go @@ -150,7 +150,7 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( }() entry, err = actualStore.FindEntry(ctx, fp) - glog.V(4).Infof("FindEntry %s: %v", fp, err) + // glog.V(4).Infof("FindEntry %s: %v", fp, err) if err != nil { return nil, err } diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index b03b3bbb4..458cf88be 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -106,7 +106,7 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() - glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } @@ -137,7 +137,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { if chunkStart >= chunkStop { continue } - glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) + // glog.V(4).Infof("read [%d,%d), %d/%d chunk %s [%d,%d)", chunkStart, chunkStop, i, len(c.chunkViews), chunk.FileId, chunk.LogicOffset-chunk.Offset, chunk.LogicOffset-chunk.Offset+int64(chunk.Size)) var buffer []byte bufferOffset := chunkStart - chunk.LogicOffset + chunk.Offset bufferLength := chunkStop - chunkStart @@ -152,7 +152,7 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { startOffset, remaining = startOffset+int64(copied), remaining-int64(copied) } - glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) + // glog.V(4).Infof("doReadAt [%d,%d), n:%v, err:%v", offset, offset+int64(len(p)), n, err) if err == nil && remaining > 0 && c.fileSize > startOffset { delta := int(min(remaining, c.fileSize-startOffset)) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 09d5fd449..72e41247f 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -296,7 +296,7 @@ func (dir *Dir) Mkdir(ctx context.Context, req *fuse.MkdirRequest) (fs.Node, err func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse.LookupResponse) (node fs.Node, err error) { dirPath := util.FullPath(dir.FullPath()) - glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) + // glog.V(4).Infof("dir Lookup %s: %s by %s", dirPath, req.Name, req.Header.String()) fullFilePath := dirPath.Child(req.Name) visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index a04efb6aa..58c150acf 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -56,18 +56,23 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { } pages.tf = tf pages.writtenIntervals.tempFile = tf + pages.writtenIntervals.lastOffset = 0 } - writtenOffset := pages.writtenIntervals.TotalSize() + writtenOffset := pages.writtenIntervals.lastOffset + dataSize := int64(len(data)) - glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+int64(len(data))) + // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) if _, err := pages.tf.WriteAt(data, writtenOffset); err != nil { pages.lastErr = err } else { pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) + pages.writtenIntervals.lastOffset += dataSize } + // pages.writtenIntervals.debug() + return } @@ -81,6 +86,11 @@ func (pages *TempFileDirtyPages) FlushData() error { pages.pageAddLock.Lock() defer pages.pageAddLock.Unlock() if pages.tf != nil { + + pages.writtenIntervals.tempFile = nil + pages.writtenIntervals.lists = nil + + pages.tf.Close() os.Remove(pages.tf.Name()) pages.tf = nil } @@ -91,15 +101,16 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { pageSize := pages.f.wfs.option.ChunkSizeLimit - uploadedSize := int64(0) + // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) + for _, list := range pages.writtenIntervals.lists { - for { - start, stop := max(list.Offset(), uploadedSize), min(list.Offset()+list.Size(), uploadedSize+pageSize) + listStopOffset := list.Offset() + list.Size() + for uploadedOffset:=int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { + start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) if start >= stop { - break + continue } - uploadedSize = stop - glog.V(4).Infof("uploading %v [%d,%d)", pages.f.Name, start, stop) + // glog.V(4).Infof("uploading %v [%d,%d) %d/%d", pages.f.Name, start, stop, i, len(pages.writtenIntervals.lists)) pages.saveToStorage(list.ToReader(start, stop), start, stop-start) } } diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/dirty_pages_temp_interval.go index f423b0e85..2d22845e2 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/dirty_pages_temp_interval.go @@ -1,8 +1,8 @@ package filesys import ( - "github.com/chrislusf/seaweedfs/weed/glog" "io" + "log" "os" ) @@ -20,8 +20,9 @@ type WrittenIntervalLinkedList struct { } type WrittenContinuousIntervals struct { - tempFile *os.File - lists []*WrittenIntervalLinkedList + tempFile *os.File + lastOffset int64 + lists []*WrittenIntervalLinkedList } func (list *WrittenIntervalLinkedList) Offset() int64 { @@ -95,6 +96,21 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv } } +func (c *WrittenContinuousIntervals) debug() { + log.Printf("++") + for _, l := range c.lists { + log.Printf("++++") + for t := l.Head; ; t = t.Next { + log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) + if t.Next == nil { + break + } + } + log.Printf("----") + } + log.Printf("--") +} + func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, dataOffset int64) { interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} @@ -223,6 +239,7 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader for t := l.Head; ; t = t.Next { startOffset, stopOffset := max(t.DataOffset, start), min(t.DataOffset+t.Size, stop) if startOffset < stopOffset { + // log.Printf("ToReader read [%d,%d) from [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) readers = append(readers, newFileSectionReader(l.tempFile, startOffset-t.DataOffset+t.TempOffset, startOffset, stopOffset-startOffset)) } if t.Next == nil { @@ -236,29 +253,32 @@ func (l *WrittenIntervalLinkedList) ToReader(start int64, stop int64) io.Reader } type FileSectionReader struct { - file *os.File - Offset int64 - dataStart int64 - dataStop int64 + file *os.File + tempStartOffset int64 + Offset int64 + dataStart int64 + dataStop int64 } var _ = io.Reader(&FileSectionReader{}) func newFileSectionReader(tempfile *os.File, offset int64, dataOffset int64, size int64) *FileSectionReader { return &FileSectionReader{ - file: tempfile, - Offset: offset, - dataStart: dataOffset, - dataStop: dataOffset + size, + file: tempfile, + tempStartOffset: offset, + Offset: offset, + dataStart: dataOffset, + dataStop: dataOffset + size, } } func (f *FileSectionReader) Read(p []byte) (n int, err error) { - dataLen := min(f.dataStop-f.Offset, int64(len(p))) - if dataLen < 0 { + remaining := (f.dataStop - f.dataStart) - (f.Offset - f.tempStartOffset) + if remaining <= 0 { return 0, io.EOF } - glog.V(4).Infof("reading %v [%d,%d)", f.file.Name(), f.Offset, f.Offset+dataLen) + dataLen := min(remaining, int64(len(p))) + // glog.V(4).Infof("reading [%d,%d) from %v [%d,%d)/[%d,%d) %d", f.Offset-f.tempStartOffset+f.dataStart, f.Offset-f.tempStartOffset+f.dataStart+dataLen, f.file.Name(), f.Offset, f.Offset+dataLen, f.tempStartOffset, f.tempStartOffset+f.dataStop-f.dataStart, f.dataStop-f.dataStart) n, err = f.file.ReadAt(p[:dataLen], f.Offset) if n > 0 { f.Offset += int64(n) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 1245cce71..88cfe45f0 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -38,8 +38,8 @@ type FileHandle struct { func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { fh := &FileHandle{ f: file, - dirtyPages: newContinuousDirtyPages(file, writeOnly), - /// dirtyPages: newTempFileDirtyPages(file, writeOnly), + // dirtyPages: newContinuousDirtyPages(file, writeOnly), + dirtyPages: newTempFileDirtyPages(file, writeOnly), Uid: uid, Gid: gid, } @@ -149,7 +149,7 @@ func (fh *FileHandle) readFromChunks(buff []byte, offset int64) (int64, error) { glog.Errorf("file handle read %s: %v", fileFullPath, err) } - glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) + // glog.V(4).Infof("file handle read %s [%d,%d] %d : %v", fileFullPath, offset, offset+int64(totalRead), totalRead, err) return int64(totalRead), err } @@ -175,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f entry.Content = nil entry.Attributes.FileSize = uint64(max(req.Offset+int64(len(data)), int64(entry.Attributes.FileSize))) - glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) + // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) fh.dirtyPages.AddPage(req.Offset, data) diff --git a/weed/topology/node.go b/weed/topology/node.go index a23729dd3..4556b4165 100644 --- a/weed/topology/node.go +++ b/weed/topology/node.go @@ -242,9 +242,9 @@ func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSi for _, v := range dn.GetVolumes() { if v.Size >= volumeSizeLimit { //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit) - n.GetTopology().chanFullVolumes <- &v + n.GetTopology().chanFullVolumes <- v }else if float64(v.Size) > float64(volumeSizeLimit) * growThreshold { - n.GetTopology().chanCrowdedVolumes <- &v + n.GetTopology().chanCrowdedVolumes <- v } } } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index 3932e3fbb..d704a5636 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -34,8 +34,8 @@ type Topology struct { Sequence sequence.Sequencer - chanFullVolumes chan *storage.VolumeInfo - chanCrowdedVolumes chan *storage.VolumeInfo + chanFullVolumes chan storage.VolumeInfo + chanCrowdedVolumes chan storage.VolumeInfo Configuration *Configuration @@ -57,8 +57,8 @@ func NewTopology(id string, seq sequence.Sequencer, volumeSizeLimit uint64, puls t.Sequence = seq - t.chanFullVolumes = make(chan *storage.VolumeInfo) - t.chanCrowdedVolumes = make(chan *storage.VolumeInfo) + t.chanFullVolumes = make(chan storage.VolumeInfo) + t.chanCrowdedVolumes = make(chan storage.VolumeInfo) t.Configuration = &Configuration{} diff --git a/weed/topology/topology_event_handling.go b/weed/topology/topology_event_handling.go index 2f4fba932..0f1db74df 100644 --- a/weed/topology/topology_event_handling.go +++ b/weed/topology/topology_event_handling.go @@ -39,7 +39,7 @@ func (t *Topology) StartRefreshWritableVolumes(grpcDialOption grpc.DialOption, g } }() } -func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool { +func (t *Topology) SetVolumeCapacityFull(volumeInfo storage.VolumeInfo) bool { diskType := types.ToDiskType(volumeInfo.DiskType) vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) if !vl.SetVolumeCapacityFull(volumeInfo.Id) { @@ -68,7 +68,7 @@ func (t *Topology) SetVolumeCapacityFull(volumeInfo *storage.VolumeInfo) bool { return true } -func (t *Topology) SetVolumeCrowded(volumeInfo *storage.VolumeInfo) { +func (t *Topology) SetVolumeCrowded(volumeInfo storage.VolumeInfo) { diskType := types.ToDiskType(volumeInfo.DiskType) vl := t.GetVolumeLayout(volumeInfo.Collection, volumeInfo.ReplicaPlacement, volumeInfo.Ttl, diskType) vl.SetVolumeCrowded(volumeInfo.Id) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index 57e511fa0..f315cb7e4 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -108,7 +108,7 @@ type VolumeLayout struct { diskType types.DiskType vid2location map[needle.VolumeId]*VolumeLocationList writables []needle.VolumeId // transient array of writable volume id - crowded map[needle.VolumeId]interface{} + crowded map[needle.VolumeId]struct{} readonlyVolumes *volumesBinaryState // readonly volumes oversizedVolumes *volumesBinaryState // oversized volumes volumeSizeLimit uint64 @@ -129,7 +129,7 @@ func NewVolumeLayout(rp *super_block.ReplicaPlacement, ttl *needle.TTL, diskType diskType: diskType, vid2location: make(map[needle.VolumeId]*VolumeLocationList), writables: *new([]needle.VolumeId), - crowded: make(map[needle.VolumeId]interface{}), + crowded: make(map[needle.VolumeId]struct{}), readonlyVolumes: NewVolumesBinaryState(readOnlyState, rp, ExistCopies()), oversizedVolumes: NewVolumesBinaryState(oversizedState, rp, ExistCopies()), volumeSizeLimit: volumeSizeLimit, @@ -421,7 +421,7 @@ func (vl *VolumeLayout) removeFromCrowded(vid needle.VolumeId) { func (vl *VolumeLayout) setVolumeCrowded(vid needle.VolumeId) { if _, ok := vl.crowded[vid]; !ok { - vl.crowded[vid] = nil + vl.crowded[vid] = struct{}{} glog.V(0).Infoln("Volume", vid, "becomes crowded") } } diff --git a/weed/util/constants.go b/weed/util/constants.go index 270cedb6f..9985b111c 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 45) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 47) COMMIT = "" ) |
