aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filer.go2
-rw-r--r--weed/filer/meta_aggregator.go2
-rw-r--r--weed/filer/reader_cache.go6
-rw-r--r--weed/filer/reader_pattern.go29
-rw-r--r--weed/filer/s3iam_conf.go20
-rw-r--r--weed/filer/s3iam_conf_test.go93
-rw-r--r--weed/mount/page_writer.go4
-rw-r--r--weed/mount/page_writer_pattern.go38
-rw-r--r--weed/mount/weedfs_file_write.go2
-rw-r--r--weed/s3api/auth_credentials.go5
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go3
-rw-r--r--weed/shell/command_s3_configure.go7
-rw-r--r--weed/storage/disk_location.go15
13 files changed, 169 insertions, 57 deletions
diff --git a/weed/filer/filer.go b/weed/filer/filer.go
index 86827c50e..15fe69116 100644
--- a/weed/filer/filer.go
+++ b/weed/filer/filer.go
@@ -79,7 +79,7 @@ func (f *Filer) MaybeBootstrapFromPeers(self pb.ServerAddress, existingNodes []*
return
}
- glog.V(0).Infof("bootstrap from %v", earliestNode.Address)
+ glog.V(0).Infof("bootstrap from %v clientId:%d", earliestNode.Address, f.UniqueFileId)
err = pb.FollowMetadata(pb.ServerAddress(earliestNode.Address), f.GrpcDialOption, "bootstrap", int32(f.UniqueFileId), "/", nil,
0, snapshotTime.UnixNano(), f.Signature, func(resp *filer_pb.SubscribeMetadataResponse) error {
return Replay(f.Store, resp)
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index 11f0d9184..beade3201 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -185,7 +185,7 @@ func (ma *MetaAggregator) doSubscribeToOneFiler(f *Filer, self pb.ServerAddress,
return nil
}
- glog.V(4).Infof("subscribing remote %s meta change: %v", peer, time.Unix(0, lastTsNs))
+ glog.V(4).Infof("subscribing remote %s meta change: %v, clientId:%d", peer, time.Unix(0, lastTsNs), ma.filer.UniqueFileId)
err = pb.WithFilerClient(true, peer, ma.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go
index c319f6c78..4c92f71c8 100644
--- a/weed/filer/reader_cache.go
+++ b/weed/filer/reader_cache.go
@@ -18,7 +18,7 @@ type ReaderCache struct {
}
type SingleChunkCacher struct {
- sync.RWMutex
+ sync.Mutex
cond *sync.Cond
parent *ReaderCache
chunkFileId string
@@ -183,8 +183,8 @@ func (s *SingleChunkCacher) destroy() {
}
func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
- s.RLock()
- defer s.RUnlock()
+ s.Lock()
+ defer s.Unlock()
for s.completedTime.IsZero() {
s.cond.Wait()
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
index b860bc577..ec73c59a2 100644
--- a/weed/filer/reader_pattern.go
+++ b/weed/filer/reader_pattern.go
@@ -1,8 +1,8 @@
package filer
type ReaderPattern struct {
- isStreaming bool
- lastReadOffset int64
+ isSequentialCounter int64
+ lastReadStopOffset int64
}
// For streaming read: only cache the first chunk
@@ -10,29 +10,20 @@ type ReaderPattern struct {
func NewReaderPattern() *ReaderPattern {
return &ReaderPattern{
- isStreaming: true,
- lastReadOffset: -1,
+ isSequentialCounter: 0,
+ lastReadStopOffset: 0,
}
}
func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
- isStreaming := true
- if rp.lastReadOffset > offset {
- isStreaming = false
+ if rp.lastReadStopOffset == offset {
+ rp.isSequentialCounter++
+ } else {
+ rp.isSequentialCounter--
}
- if rp.lastReadOffset == -1 {
- if offset != 0 {
- isStreaming = false
- }
- }
- rp.lastReadOffset = offset
- rp.isStreaming = isStreaming
-}
-
-func (rp *ReaderPattern) IsStreamingMode() bool {
- return rp.isStreaming
+ rp.lastReadStopOffset = offset + int64(size)
}
func (rp *ReaderPattern) IsRandomMode() bool {
- return !rp.isStreaming
+ return rp.isSequentialCounter >= 0
}
diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go
index 891bf925b..d8f3c2445 100644
--- a/weed/filer/s3iam_conf.go
+++ b/weed/filer/s3iam_conf.go
@@ -2,9 +2,12 @@ package filer
import (
"bytes"
+ "fmt"
+ "io"
+
+ "github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
- "io"
)
func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error {
@@ -23,3 +26,18 @@ func ProtoToText(writer io.Writer, config proto.Message) error {
return m.Marshal(writer, config)
}
+
+// CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys
+func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error {
+ accessKeySet := make(map[string]string)
+ for _, ident := range s3cfg.Identities {
+ for _, cred := range ident.Credentials {
+ if userName, found := accessKeySet[cred.AccessKey]; !found {
+ accessKeySet[cred.AccessKey] = ident.Name
+ } else {
+ return fmt.Errorf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName)
+ }
+ }
+ }
+ return nil
+}
diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go
index da7d9c9f1..bd9eb85ae 100644
--- a/weed/filer/s3iam_conf_test.go
+++ b/weed/filer/s3iam_conf_test.go
@@ -2,9 +2,10 @@ package filer
import (
"bytes"
- . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"testing"
+ . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
"github.com/stretchr/testify/assert"
@@ -55,3 +56,93 @@ func TestS3Conf(t *testing.T) {
assert.Equal(t, "some_access_key1", s3ConfSaved.Identities[0].Credentials[0].AccessKey)
assert.Equal(t, "some_secret_key2", s3ConfSaved.Identities[1].Credentials[0].SecretKey)
}
+
+func TestCheckDuplicateAccessKey(t *testing.T) {
+ var tests = []struct {
+ s3cfg *iam_pb.S3ApiConfiguration
+ err string
+ }{
+ {
+ &iam_pb.S3ApiConfiguration{
+ Identities: []*iam_pb.Identity{
+ {
+ Name: "some_name",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key1",
+ SecretKey: "some_secret_key1",
+ },
+ },
+ Actions: []string{
+ ACTION_ADMIN,
+ ACTION_READ,
+ ACTION_WRITE,
+ },
+ },
+ {
+ Name: "some_read_only_user",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key2",
+ SecretKey: "some_secret_key2",
+ },
+ },
+ Actions: []string{
+ ACTION_READ,
+ ACTION_TAGGING,
+ ACTION_LIST,
+ },
+ },
+ },
+ },
+ "",
+ },
+ {
+ &iam_pb.S3ApiConfiguration{
+ Identities: []*iam_pb.Identity{
+ {
+ Name: "some_name",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key1",
+ SecretKey: "some_secret_key1",
+ },
+ },
+ Actions: []string{
+ ACTION_ADMIN,
+ ACTION_READ,
+ ACTION_WRITE,
+ },
+ },
+ {
+ Name: "some_read_only_user",
+ Credentials: []*iam_pb.Credential{
+ {
+ AccessKey: "some_access_key1",
+ SecretKey: "some_secret_key1",
+ },
+ },
+ Actions: []string{
+ ACTION_READ,
+ ACTION_TAGGING,
+ ACTION_LIST,
+ },
+ },
+ },
+ },
+ "duplicate accessKey[some_access_key1], already configured in user[some_name]",
+ },
+ }
+ for i, test := range tests {
+ err := CheckDuplicateAccessKey(test.s3cfg)
+ var errString string
+ if err == nil {
+ errString = ""
+ } else {
+ errString = err.Error()
+ }
+ if errString != test.err {
+ t.Errorf("[%d]: got: %s expected: %s", i, errString, test.err)
+ }
+ }
+}
diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go
index 016c4841a..7e3db8e28 100644
--- a/weed/mount/page_writer.go
+++ b/weed/mount/page_writer.go
@@ -29,14 +29,14 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter {
return pw
}
-func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) {
+func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) {
glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data)))
chunkIndex := offset / pw.chunkSize
for i := chunkIndex; len(data) > 0; i++ {
writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
- pw.addToOneChunk(i, offset, data[:writeSize], isSequentail)
+ pw.addToOneChunk(i, offset, data[:writeSize], isSequential)
offset += writeSize
data = data[writeSize:]
}
diff --git a/weed/mount/page_writer_pattern.go b/weed/mount/page_writer_pattern.go
index 665056b36..1ec9c9d4c 100644
--- a/weed/mount/page_writer_pattern.go
+++ b/weed/mount/page_writer_pattern.go
@@ -1,9 +1,9 @@
package mount
type WriterPattern struct {
- isStreaming bool
- lastWriteOffset int64
- chunkSize int64
+ isSequentialCounter int64
+ lastWriteStopOffset int64
+ chunkSize int64
}
// For streaming write: only cache the first chunk
@@ -12,33 +12,21 @@ type WriterPattern struct {
func NewWriterPattern(chunkSize int64) *WriterPattern {
return &WriterPattern{
- isStreaming: true,
- lastWriteOffset: -1,
- chunkSize: chunkSize,
+ isSequentialCounter: 0,
+ lastWriteStopOffset: 0,
+ chunkSize: chunkSize,
}
}
func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
- if rp.lastWriteOffset > offset {
- rp.isStreaming = false
+ if rp.lastWriteStopOffset == offset {
+ rp.isSequentialCounter++
+ } else {
+ rp.isSequentialCounter--
}
- if rp.lastWriteOffset == -1 {
- if offset != 0 {
- rp.isStreaming = false
- }
- }
- rp.lastWriteOffset = offset
-}
-
-func (rp *WriterPattern) IsStreamingMode() bool {
- return rp.isStreaming
-}
-
-func (rp *WriterPattern) IsRandomMode() bool {
- return !rp.isStreaming
+ rp.lastWriteStopOffset = offset + int64(size)
}
-func (rp *WriterPattern) Reset() {
- rp.isStreaming = true
- rp.lastWriteOffset = -1
+func (rp *WriterPattern) IsSequentialMode() bool {
+ return rp.isSequentialCounter >= 0
}
diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go
index d14680752..2b7a6cea2 100644
--- a/weed/mount/weedfs_file_write.go
+++ b/weed/mount/weedfs_file_write.go
@@ -58,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr
entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize)))
// glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data))
- fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode())
+ fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode())
written = uint32(len(data))
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index fb23d9ce9..f9e97ea22 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -109,6 +109,11 @@ func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []b
glog.Warningf("unmarshal error: %v", err)
return fmt.Errorf("unmarshal error: %v", err)
}
+
+ if err := filer.CheckDuplicateAccessKey(s3ApiConfiguration); err != nil {
+ return err
+ }
+
if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil {
return err
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 745379e7c..6a4a5bb17 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -90,6 +90,9 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
peerAddress := findClientAddress(stream.Context(), 0)
+ // use negative client id to differentiate from addClient()/deleteClient() used in SubscribeMetadata()
+ req.ClientId = -req.ClientId
+
alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId)
if alreadyKnown {
return fmt.Errorf("duplicated local subscription detected for client %s id %d", clientName, req.ClientId)
diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go
index ddcafd847..422df2e75 100644
--- a/weed/shell/command_s3_configure.go
+++ b/weed/shell/command_s3_configure.go
@@ -4,11 +4,12 @@ import (
"bytes"
"flag"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/filer"
"io"
"sort"
"strings"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/iam_pb"
)
@@ -164,6 +165,10 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io
s3cfg.Identities = append(s3cfg.Identities, &identity)
}
+ if err = filer.CheckDuplicateAccessKey(s3cfg); err != nil {
+ return err
+ }
+
buf.Reset()
filer.ProtoToText(&buf, s3cfg)
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index a2a63acbb..8af8ea663 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"runtime"
+ "strconv"
"strings"
"sync"
"time"
@@ -208,8 +209,18 @@ func (l *DiskLocation) concurrentLoadingVolumes(needleMapKind NeedleMapKind, con
func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapKind) {
workerNum := runtime.NumCPU()
- if workerNum <= 10 {
- workerNum = 10
+ val, ok := os.LookupEnv("GOMAXPROCS")
+ if ok {
+ num, err := strconv.Atoi(val)
+ if err != nil || num < 1 {
+ num = 10
+ glog.Warningf("failed to set worker number from GOMAXPROCS , set to default:10")
+ }
+ workerNum = num
+ } else {
+ if workerNum <= 10 {
+ workerNum = 10
+ }
}
l.concurrentLoadingVolumes(needleMapKind, workerNum)
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)