aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2022-07-13 02:30:53 -0700
committerchrislu <chris.lu@gmail.com>2022-07-13 02:30:53 -0700
commit9c0459685e4784d18c30105861e479af3aadb84f (patch)
tree5a467b9a57223f8d6e35922b1f84d065747915e2
parent9a712df6eea5d24cba6fb068c913c3f2f771f4c3 (diff)
parent289402a741be0ee7370ae1d27834afb23003596d (diff)
downloadseaweedfs-9c0459685e4784d18c30105861e479af3aadb84f.tar.xz
seaweedfs-9c0459685e4784d18c30105861e479af3aadb84f.zip
Merge branch 'master' into messaging
-rw-r--r--weed/filer/reader_cache.go6
-rw-r--r--weed/filer/reader_pattern.go29
-rw-r--r--weed/mount/page_writer.go4
-rw-r--r--weed/mount/page_writer_pattern.go38
-rw-r--r--weed/mount/weedfs_file_write.go2
-rw-r--r--weed/shell/command_s3_configure.go12
-rw-r--r--weed/storage/disk_location.go7
7 files changed, 47 insertions, 51 deletions
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index c319f6c78..4c92f71c8 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -18,7 +18,7 @@ type ReaderCache struct {
}
type SingleChunkCacher struct {
- sync.RWMutex
+ sync.Mutex
cond *sync.Cond
parent *ReaderCache
chunkFileId string
@@ -183,8 +183,8 @@ func (s *SingleChunkCacher) destroy() {
}
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
- s.RLock()
- defer s.RUnlock()
+ s.Lock()
+ defer s.Unlock()
for s.completedTime.IsZero() {
s.cond.Wait()
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
index b860bc577..ec73c59a2 100644
--- a/weed/filer/reader_pattern.go
+++ b/weed/filer/reader_pattern.go
@@ -1,8 +1,8 @@
package filer
type ReaderPattern struct {
- isStreaming bool
- lastReadOffset int64
+ isSequentialCounter int64
+ lastReadStopOffset int64
}
// For streaming read: only cache the first chunk
@@ -10,29 +10,20 @@ type ReaderPattern struct {
func NewReaderPattern() *ReaderPattern {
return &ReaderPattern{
- isStreaming: true,
- lastReadOffset: -1,
+ isSequentialCounter: 0,
+ lastReadStopOffset: 0,
}
}
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
- isStreaming := true
- if rp.lastReadOffset > offset {
- isStreaming = false
+ if rp.lastReadStopOffset == offset {
+ rp.isSequentialCounter++
+ } else {
+ rp.isSequentialCounter--
}
- if rp.lastReadOffset == -1 {
- if offset != 0 {
- isStreaming = false
- }
- }
- rp.lastReadOffset = offset
- rp.isStreaming = isStreaming
-}
-
-func (rp *ReaderPattern) IsStreamingMode() bool {
- return rp.isStreaming
+ rp.lastReadStopOffset = offset + int64(size)
}
func (rp *ReaderPattern) IsRandomMode() bool {
- return !rp.isStreaming
+ return rp.isSequentialCounter >= 0
}
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index 016c4841a..7e3db8e28 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -29,14 +29,14 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw
}
-func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) {
+func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- pw.addToOneChunk(i, offset, data[:writeSize], isSequentail)
+ pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
offset += writeSize
data = data[writeSize:]
}
diff --git a/weed/mount/page_writer_pattern.go b/weed/mount/page_writer_pattern.go
index 665056b36..1ec9c9d4c 100644
--- a/weed/mount/page_writer_pattern.go
+++ b/weed/mount/page_writer_pattern.go
@@ -1,9 +1,9 @@
package mount
type WriterPattern struct {
- isStreaming bool
- lastWriteOffset int64
- chunkSize int64
+ isSequentialCounter int64
+ lastWriteStopOffset int64
+ chunkSize int64
}
// For streaming write: only cache the first chunk
@@ -12,33 +12,21 @@ type WriterPattern struct {
func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{
- isStreaming: true,
- lastWriteOffset: -1,
- chunkSize: chunkSize,
+ isSequentialCounter: 0,
+ lastWriteStopOffset: 0,
+ chunkSize: chunkSize,
}
}
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
- if rp.lastWriteOffset > offset {
- rp.isStreaming = false
+ if rp.lastWriteStopOffset == offset {
+ rp.isSequentialCounter++
+ } else {
+ rp.isSequentialCounter--
}
- if rp.lastWriteOffset == -1 {
- if offset != 0 {
- rp.isStreaming = false
- }
- }
- rp.lastWriteOffset = offset
-}
-
-func (rp *WriterPattern) IsStreamingMode() bool {
- return rp.isStreaming
-}
-
-func (rp *WriterPattern) IsRandomMode() bool {
- return !rp.isStreaming
+ rp.lastWriteStopOffset = offset + int64(size)
}
-func (rp *WriterPattern) Reset() {
- rp.isStreaming = true
- rp.lastWriteOffset = -1
+func (rp *WriterPattern) IsSequentialMode() bool {
+ return rp.isSequentialCounter >= 0
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index d14680752..2b7a6cea2 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -58,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode())
+ fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
written = uint32(len(data))
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index ddcafd847..0660b7889 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -2,6 +2,7 @@ package shell
import (
"bytes"
+ "errors"
"flag"
"fmt"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -164,6 +165,17 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
s3cfg.Identities = append(s3cfg.Identities, &identity)
}
+ accessKeySet := make(map[string]string)
+ for _, ident := range s3cfg.Identities {
+ for _, cred := range ident.Credentials {
+ if userName, found := accessKeySet[cred.AccessKey]; !found {
+ accessKeySet[cred.AccessKey] = ident.Name
+ } else {
+ return errors.New(fmt.Sprintf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName))
+ }
+ }
+ }
+
buf.Reset()
filer.ProtoToText(&buf, s3cfg)
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 847324838..a2a63acbb 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
+ "runtime"
"strings"
"sync"
"time"
@@ -206,7 +207,11 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
- l.concurrentLoadingVolumes(needleMapKind, 10)
+ workerNum := runtime.NumCPU()
+ if workerNum <= 10 {
+ workerNum = 10
+ }
+ l.concurrentLoadingVolumes(needleMapKind, workerNum)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards()